您当前的位置: 首页 >  ar

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Spark--分组TopN

宝哥大数据 发布时间:2017-05-30 23:09:08 ,浏览量:0

为了获取每个分组的topN, 首先要进行分组, 再对每个分组进行排序,获取TopN。 测试数据
hadoop 23
spark 45
java 90
spark 57
spark 90
hadoop 99
hadoop 76
spark 45
spark 88
spark 89
hadoop 45
hadoop 90
java 78
java 70
1.1、第一步, 将源数据转化为(key, value)格式,便于按照key分组
        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount");
        //内部实际调用的SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //读取文件,将每行数据转换为
        JavaRDD lines = jsc.textFile("C:\\Users\\12285\\Desktop\\test");//hadoopRDD

        JavaPairRDD pairs = lines.mapToPair(new PairFunction() {

            private static final long serialVersionUID = 1L;

            public Tuple2 call(String line) throws Exception {
                return new Tuple2(line.split(" ")[0], Integer.valueOf(line.split(" ")[1]));
            }
        });
1.2、第二步,按照key分组
        //按照科目分组
        JavaPairRDD groupPairs = pairs.groupByKey();
1.3、第三步,组内进行排序 1.3.1、由于分组后的RDD为JavaPairRDD, key为科目名称, value为成绩的集合,所以将使用mapToPair, 在组内进行排序

        JavaPairRDD top5Pairs = groupPairs.mapToPair(new PairFunction() {

            private static final long serialVersionUID = 1L;

            public Tuple2 call(Tuple2 groupedPair) throws Exception {
                Integer[] top5 = new Integer[5];
                String groupedKey = groupedPair._1;
                Iterator groupedValue = groupedPair._2.iterator();
                while(groupedValue.hasNext()) {
                    Integer value = groupedValue.next();

                    for (int i = 0; i < top5.length; i++) {
                        if (top5[i] == null) {
                            top5[i] = value;
                            break;
                        }else if(top5[i] > value){ //索引处的值比value大
                            //该位置之后的值向后移动
                            for (int j = 4; j > i; j--) {
                                top5[j] = top5[j-1];
                            }
                            top5[i] = value;
                            break;
                        } 
                        //否则, //索引处的值比value小, 在top5总继续向后比较
                    }
                }
                return new Tuple2(groupedKey, Arrays.asList(top5));
            }
        });
1.4、完整代码
package com.chb.sparkDemo.TopNGroup;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class TopNGroupTest {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount");
        //内部实际调用的SparkContext
        JavaSparkContext jsc = new JavaSparkContext(conf);
        //读取文件,将每行数据转换为
        JavaRDD lines = jsc.textFile("C:\\Users\\12285\\Desktop\\test");//hadoopRDD

        JavaPairRDD pairs = lines.mapToPair(new PairFunction() {

            private static final long serialVersionUID = 1L;

            public Tuple2 call(String line) throws Exception {
                return new Tuple2(line.split(" ")[0], Integer.valueOf(line.split(" ")[1]));
            }
        });

        //按照科目分组
        JavaPairRDD groupPairs = pairs.groupByKey();

        JavaPairRDD top5Pairs = groupPairs.mapToPair(new PairFunction() {

            private static final long serialVersionUID = 1L;

            public Tuple2 call(Tuple2 groupedPair) throws Exception {
                Integer[] top5 = new Integer[5];
                String groupedKey = groupedPair._1;
                Iterator groupedValue = groupedPair._2.iterator();
                while(groupedValue.hasNext()) {
                    Integer value = groupedValue.next();

                    for (int i = 0; i < top5.length; i++) {
                        if (top5[i] == null) {
                            top5[i] = value;
                            break;
                        }else if(top5[i] > value){ //索引处的值比value大
                            //该位置之后的值向后移动
                            for (int j = 4; j > i; j--) {
                                top5[j] = top5[j-1];
                            }
                            top5[i] = value;
                            break;
                        } 
                        //否则, //索引处的值比value小, 在top5总继续向后比较
                    }
                }
                return new Tuple2(groupedKey, Arrays.asList(top5));
            }
        });

        top5Pairs.foreach(new VoidFunction() {

            private static final long serialVersionUID = 1L;

            public void call(Tuple2 pair) throws Exception {
                String groupedKey = pair._1;
                System.out.println("Grouped Key : " + groupedKey);
                Iterator groupedValue = pair._2.iterator();
                while(groupedValue.hasNext()) {
                    Integer value = groupedValue.next();
                    System.out.println(value);
                }
                System.out.println("==================================");
            }
        });


    }
}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0396s