本节课基于案例试图通过一节课贯通Spark Streaming流计算框架的运行源码,这节课建立在之前4节课的基础之上,本节内容分成2部分:1,在线动态计算分类最热门商品案例回顾与演示 2,基于案例贯通Spark Streaming的运行源码。
在线动态计算分类最热门商品案例回顾与演示这个基于之前的课程内容。
OnlineTheTop3ItemForEachCategory2DB.scala业务代码如下:
1. package com.dt.spark.sparkstreaming
2.
3. importorg.apache.spark.SparkConf
4. import org.apache.spark.sql.Row
5. importorg.apache.spark.sql.hive.HiveContext
6. importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
7. import org.apache.spark.streaming.{Seconds,StreamingContext}
8.
9. /**
10. * 使用Spark Streaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别
11. * 下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;
12. *
13. * @author DT大数据梦工厂
14. * 新浪微博:http://weibo.com/ilovepains/
15. *
16. *
17. * 实现技术:Spark Streaming+Spark SQL,之所以Spark Streaming能够使用ML、sql、graphx等功能是因为有foreachRDD和Transform
18. * 等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。
19. * 假设说这里的数据的格式:user item category,例如Rocky Samsung Android
20. */
21. object OnlineTheTop3ItemForEachCategory2DB{
22. def main(args: Array[String]){
23. /**
24. * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
25. * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
26. * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
27. * 只有1G的内存)的初学者 *
28. */
29. val conf = new SparkConf() //创建SparkConf对象
30. conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")//设置应用程序的名称,在程序运行的监控界面可以看到名称
31. // conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
32. conf.setMaster("local[6]")
33. //设置batchDuration时间间隔来控制Job生成的频率并且创建SparkStreaming执行的入口
34. val ssc = new StreamingContext(conf,Seconds(5))
35.
36. ssc.checkpoint("/root/Documents/SparkApps/checkpoint")
37.
38.
39. val userClickLogsDStream =ssc.socketTextStream("Master", 9999)
40.
41. val formattedUserClickLogsDStream =userClickLogsDStream.map(clickLog =>
42. (clickLog.split(" ")(2) +"_" + clickLog.split(" ")(1), 1))
43.
44. // val categoryUserClickLogsDStream =formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 +v2,
45. // (v1:Int, v2: Int) => v1 - v2,Seconds(60), Seconds(20))
46.
47. val categoryUserClickLogsDStream =formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
48. _-_, Seconds(60), Seconds(20))
49.
50. categoryUserClickLogsDStream.foreachRDD { rdd=> {
51. if (rdd.isEmpty()) {
52. println("No datainputted!!!")
53. } else {
54. val categoryItemRow =rdd.map(reducedItem => {
55. val category =reducedItem._1.split("_")(0)
56. val item =reducedItem._1.split("_")(1)
57. val click_count = reducedItem._2
58. Row(category, item, click_count)
59. })
60.
61. val structType = StructType(Array(
62. StructField("category",StringType, true),
63. StructField("item",StringType, true),
64. StructField("click_count",IntegerType, true)
65. ))
66.
67. val hiveContext = newHiveContext(rdd.context)
68. val categoryItemDF =hiveContext.createDataFrame(categoryItemRow, structType)
69.
70. categoryItemDF.registerTempTable("categoryItemTable")
71.
72. val reseltDataFram =hiveContext.sql("SELECT category,item,click_count FROM (SELECTcategory,item,click_count,row_number()" +
73. " OVER (PARTITION BY categoryORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
74. " WHERE rank
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?