文章目录
一、实时热门商品统计
1.1、基本需求
- 一、实时热门商品统计
- 1.1、基本需求
- 1.1.1、需求分析
- 1.2、模块实现
- 1.2.1、完整代码
- 统计近1小时内的热门商品,每5分钟更新一次
- 热门度用浏览次数(“pv”)来衡量
我们将实现一个“实时热门商品”的需求,可以将“实时热门商品”理解为:每隔5分钟输出最近一小时内点击量最多的前N个商品。 将这个需求进行分解我们大概要做这么几件事情: • 抽取出业务时间戳,告诉Flink框架基于业务时间做窗口 • 过滤出点击行为数据 • 按一小时的窗口大小,每5分钟统计一次,做滑动窗口聚合(Sliding Window) • 按每个窗口聚合,输出每个窗口中点击量前N名的商品
1.2、模块实现 1.2.1、完整代码package com.chb.userbehavioranalysis.hotitem
import java.sql.Timestamp
import java.util.Properties
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.java.tuple.{Tuple, Tuple1}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
object HotItems {
def main(args: Array[String]): Unit = {
val properties = new Properties()
properties.setProperty("bootstrap.servers", "10.0.0.201:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
// 创建一个env
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 显式地定义Time类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val dataURL = getClass.getResource("/UserBehavior.csv")
val stream = env
// .readTextFile(dataURL.getPath)
.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
.map(line => {
val linearray = line.split(",")
UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
})
// 指定时间戳和watermark
.assignAscendingTimestamps(_.timestamp * 1000)
.filter(_.behavior == "pv")
.keyBy("itemId")
.timeWindow(Time.hours(1), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy("windowEnd")
.process(new TopNHotItems(3))
.print()
// 调用execute执行任务
env.execute("Hot Items Job")
}
// 自定义实现聚合函数
class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
override def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
// 自定义实现Window Function,输出ItemViewCount格式
class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {
val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
val count = input.iterator.next()
out.collect(ItemViewCount(itemId, window.getEnd, count))
}
}
// 自定义实现process function
class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
// 定义状态ListState
private var itemState: ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 命名状态变量的名字和类型
val itemStateDesc = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount])
itemState = getRuntimeContext.getListState(itemStateDesc)
}
override def processElement(i: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
itemState.add(i)
// 注册定时器,触发时间定为 windowEnd + 1,触发时说明window已经收集完成所有数据
context.timerService.registerEventTimeTimer(i.windowEnd + 1)
}
// 定时器触发操作,从state里取出所有数据,排序取TopN,输出
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 获取所有的商品点击信息
val allItems: ListBuffer[ItemViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (item
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?