您当前的位置: 首页 >  ar

段智华

暂无认证

  • 0浏览

    0关注

    1232博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

段智华 发布时间:2017-09-06 06:10:05 ,浏览量:0

本节课基于案例试图通过一节课贯通Spark Streaming流计算框架的运行源码,这节课建立在之前4节课的基础之上,本节内容分成2部分:1,在线动态计算分类最热门商品案例回顾与演示 2,基于案例贯通Spark Streaming的运行源码。

在线动态计算分类最热门商品案例回顾与演示这个基于之前的课程内容。

OnlineTheTop3ItemForEachCategory2DB.scala业务代码如下:

1.          package com.dt.spark.sparkstreaming

2.          

3.         importorg.apache.spark.SparkConf

4.         import org.apache.spark.sql.Row

5.         importorg.apache.spark.sql.hive.HiveContext

6.         importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

7.         import org.apache.spark.streaming.{Seconds,StreamingContext}

8.          

9.         /**

10.        * 使用Spark Streaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机、电视这个类别

11.        * 下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义;

12.        *

13.        * @author DT大数据梦工厂

14.        * 新浪微博:http://weibo.com/ilovepains/

15.        *

16.        *

17.        *   实现技术:Spark Streaming+Spark SQL,之所以Spark Streaming能够使用ML、sql、graphx等功能是因为有foreachRDD和Transform

18.        * 等接口,这些接口中其实是基于RDD进行操作,所以以RDD为基石,就可以直接使用Spark其它所有的功能,就像直接调用API一样简单。

19.        *  假设说这里的数据的格式:user item category,例如Rocky Samsung Android

20.        */

21.      object OnlineTheTop3ItemForEachCategory2DB{

22.        def main(args: Array[String]){

23.          /**

24.            * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,

25.            * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置

26.            * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

27.            * 只有1G的内存)的初学者       *

28.            */

29.          val conf = new SparkConf() //创建SparkConf对象

30.          conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")//设置应用程序的名称,在程序运行的监控界面可以看到名称

31.      //   conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

32.          conf.setMaster("local[6]")

33.          //设置batchDuration时间间隔来控制Job生成的频率并且创建SparkStreaming执行的入口

34.          val ssc = new StreamingContext(conf,Seconds(5))

35.       

36.          ssc.checkpoint("/root/Documents/SparkApps/checkpoint")

37.       

38.       

39.          val userClickLogsDStream =ssc.socketTextStream("Master", 9999)

40.       

41.          val formattedUserClickLogsDStream =userClickLogsDStream.map(clickLog =>

42.              (clickLog.split(" ")(2) +"_" + clickLog.split(" ")(1), 1))

43.       

44.      //    val categoryUserClickLogsDStream =formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2: Int) => v1 +v2,

45.      //      (v1:Int, v2: Int) => v1 - v2,Seconds(60), Seconds(20))

46.       

47.          val categoryUserClickLogsDStream =formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,

48.            _-_, Seconds(60), Seconds(20))

49.       

50.          categoryUserClickLogsDStream.foreachRDD { rdd=> {

51.            if (rdd.isEmpty()) {

52.              println("No datainputted!!!")

53.            } else {

54.              val categoryItemRow =rdd.map(reducedItem => {

55.                val category =reducedItem._1.split("_")(0)

56.                val item =reducedItem._1.split("_")(1)

57.                val click_count = reducedItem._2

58.                Row(category, item, click_count)

59.              })

60.       

61.              val structType = StructType(Array(

62.                StructField("category",StringType, true),

63.                StructField("item",StringType, true),

64.                StructField("click_count",IntegerType, true)

65.              ))

66.       

67.              val hiveContext = newHiveContext(rdd.context)

68.              val categoryItemDF =hiveContext.createDataFrame(categoryItemRow, structType)

69.       

70.              categoryItemDF.registerTempTable("categoryItemTable")

71.       

72.              val reseltDataFram =hiveContext.sql("SELECT category,item,click_count FROM (SELECTcategory,item,click_count,row_number()" +

73.                " OVER (PARTITION BY categoryORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +

74.                " WHERE rank

关注
打赏
1659361485
查看更多评论
立即登录/注册

微信扫码登录

0.0596s