为了获取每个分组的topN, 首先要进行分组, 再对每个分组进行排序,获取TopN。
测试数据
hadoop 23
spark 45
java 90
spark 57
spark 90
hadoop 99
hadoop 76
spark 45
spark 88
spark 89
hadoop 45
hadoop 90
java 78
java 70
1.1、第一步, 将源数据转化为(key, value)格式,便于按照key分组
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount");
//内部实际调用的SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
//读取文件,将每行数据转换为
JavaRDD lines = jsc.textFile("C:\\Users\\12285\\Desktop\\test");//hadoopRDD
JavaPairRDD pairs = lines.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
public Tuple2 call(String line) throws Exception {
return new Tuple2(line.split(" ")[0], Integer.valueOf(line.split(" ")[1]));
}
});
1.2、第二步,按照key分组
//按照科目分组
JavaPairRDD groupPairs = pairs.groupByKey();
1.3、第三步,组内进行排序
1.3.1、由于分组后的RDD为JavaPairRDD, key为科目名称, value为成绩的集合,所以将使用mapToPair, 在组内进行排序
JavaPairRDD top5Pairs = groupPairs.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
public Tuple2 call(Tuple2 groupedPair) throws Exception {
Integer[] top5 = new Integer[5];
String groupedKey = groupedPair._1;
Iterator groupedValue = groupedPair._2.iterator();
while(groupedValue.hasNext()) {
Integer value = groupedValue.next();
for (int i = 0; i < top5.length; i++) {
if (top5[i] == null) {
top5[i] = value;
break;
}else if(top5[i] > value){ //索引处的值比value大
//该位置之后的值向后移动
for (int j = 4; j > i; j--) {
top5[j] = top5[j-1];
}
top5[i] = value;
break;
}
//否则, //索引处的值比value小, 在top5总继续向后比较
}
}
return new Tuple2(groupedKey, Arrays.asList(top5));
}
});
1.4、完整代码
package com.chb.sparkDemo.TopNGroup;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class TopNGroupTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount");
//内部实际调用的SparkContext
JavaSparkContext jsc = new JavaSparkContext(conf);
//读取文件,将每行数据转换为
JavaRDD lines = jsc.textFile("C:\\Users\\12285\\Desktop\\test");//hadoopRDD
JavaPairRDD pairs = lines.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
public Tuple2 call(String line) throws Exception {
return new Tuple2(line.split(" ")[0], Integer.valueOf(line.split(" ")[1]));
}
});
//按照科目分组
JavaPairRDD groupPairs = pairs.groupByKey();
JavaPairRDD top5Pairs = groupPairs.mapToPair(new PairFunction() {
private static final long serialVersionUID = 1L;
public Tuple2 call(Tuple2 groupedPair) throws Exception {
Integer[] top5 = new Integer[5];
String groupedKey = groupedPair._1;
Iterator groupedValue = groupedPair._2.iterator();
while(groupedValue.hasNext()) {
Integer value = groupedValue.next();
for (int i = 0; i < top5.length; i++) {
if (top5[i] == null) {
top5[i] = value;
break;
}else if(top5[i] > value){ //索引处的值比value大
//该位置之后的值向后移动
for (int j = 4; j > i; j--) {
top5[j] = top5[j-1];
}
top5[i] = value;
break;
}
//否则, //索引处的值比value小, 在top5总继续向后比较
}
}
return new Tuple2(groupedKey, Arrays.asList(top5));
}
});
top5Pairs.foreach(new VoidFunction() {
private static final long serialVersionUID = 1L;
public void call(Tuple2 pair) throws Exception {
String groupedKey = pair._1;
System.out.println("Grouped Key : " + groupedKey);
Iterator groupedValue = pair._2.iterator();
while(groupedValue.hasNext()) {
Integer value = groupedValue.next();
System.out.println(value);
}
System.out.println("==================================");
}
});
}
}