import org.apache.spark.{SparkConf, SparkContext}
/**
* 先分组, 后排序
* sorted:
* sortWith:
* sortBy:
*/
object GroupTopN {
val conf = new SparkConf().setMaster("local").setAppName("RDDTest")
val sc = new SparkContext(conf)
def main(args: Array[String]): Unit = {
testSortBy()
sc.stop()
}
def testSorted() = {
val rdd1 = sc.parallelize(List(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("b", 5), ("b", 6), ("b", 7), ("b", 8)))
//根据key分组并内部降序
rdd1.groupByKey().mapValues(f => {
//分组内部排序的两种方式
f.toList.sorted.reverse
//f.toList.sortWith(_>_)
}).foreach(println)
}
def testSortWith() = {
val rdd3 = sc.parallelize(List(("a", (1, 5)), ("a", (2, 4)), ("a", (3, 6)), ("a", (4, 1)),
("b", (5, 2)), ("b", (6, 3)), ("b", (7, 9)), ("b", (8, 8))))
rdd3.groupByKey().map(t => {
val its = t._2
val sortResult = its.toList.sortWith(_._2 > _._2).take(5)
(t._1, sortResult)
}).foreach(println)
}
def testSortBy() = {
println("map === sortBy")
val word = sc.parallelize(List(("a", (1, 5)), ("a", (2, 4)), ("a", (3, 6)), ("a", (4, 1)),
("b", (5, 2)), ("b", (6, 3)), ("b", (7, 9)), ("b", (8, 8))))
val group = word
.groupByKey()
.map(t => {
// reverse 降序
val topItem = t._2.toArray.sortBy(_._2)(Ordering[Int].reverse)
for (tp for (tp
1587549273
查看更多评论