一、需求分析
1.1、基本需求
- 从web服务器的日志中,统计实时的访问流量
- 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
- 将 apache 服务器日志中的时间,转换为时间戳,作为 Event Time
- 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒
我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。我们在这里实现最基本的“页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问url的次数。 具体步骤为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似
package com.chb.userbehavioranalysis.traffic
import java.sql.Timestamp
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.mutable.ListBuffer
// 输入数据格式
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
// 输出数据格式
case class UrlViewCount(url: String, windowEnd: Long, count: Long)
/**
* 实时流量统计
*/
object NetworkTraffic {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.readTextFile(getClass.getResource("/apache.log").getPath)
.map(line => {
val linearray = line.split(" ")
// 定义时间转换模板将时间转成时间戳
val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDateFormat.parse(linearray(3)).getTime
ApacheLogEvent(linearray(0), linearray(1), timestamp, linearray(5), linearray(6))
})
// 乱序数据处理,创建时间戳和水位
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(10)) {
override def extractTimestamp(t: ApacheLogEvent): Long = {
t.eventTime
}
})
.filter(_.method == "GET")
.keyBy(_.url)
.timeWindow(Time.minutes(1), Time.seconds(5))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy(_.windowEnd)
.process(new TopNHotUrls(5))
.print()
env.execute("Network Traffic Analysis Job")
}
class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class WindowResultFunction extends WindowFunction[Long, UrlViewCount, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
val url: String = key
val count = input.iterator.next()
out.collect(UrlViewCount(url, window.getEnd, count))
}
}
// 自定义process function,统计访问量最大的url,排序输出
class TopNHotUrls(topSize: Int) extends KeyedProcessFunction[Long, UrlViewCount, String] {
// 直接定义状态变量,懒加载
lazy val urlState: ListState[UrlViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("urlState", classOf[UrlViewCount]))
override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {
// 把每条数据保存到状态中
urlState.add(i)
// 注册一个定时器,windowEnd + 10秒 时触发
context.timerService().registerEventTimeTimer(i.windowEnd + 10 * 1000)
}
// 实现ontimer
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 从状态中获取所有的Url访问量
val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (urlView ("pv", 1))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.sum(1)
dataStream.print("pv count")
env.execute("page view jpb")
}
}
三、UniqueVisitor 独立访客记录数
package com.chb.userbehavioranalysis.traffic
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
case class UvCount(windowEnd: Long, uvCount: Long)
/**
* 独立访客记录数
*/
object UniqueVisitor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 用相对路径定义数据源
val resource = getClass.getResource("/UserBehavior.csv")
val dataStream = env.readTextFile(resource.getPath)
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
.filter(_.behavior == "pv") // 只统计pv操作
.timeWindowAll(Time.hours(1))
.apply(new UvCountByWindow())
dataStream.print()
env.execute("uv job")
}
}
class UvCountByWindow() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] {
override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
// 定义一个scala set,用于保存所有的数据userId并去重
var idSet = Set[Long]()
// 把当前窗口所有数据的ID收集到set中,最后输出set的大小
for (userBehavior {
val dataArray = data.split(",")
UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
.filter(_.behavior == "pv") // 只统计pv操作
.map(data => ("dummyKey", data.userId))
.keyBy(_._1)
.timeWindow(Time.hours(1))
.trigger(new MyTrigger())
.process(new UvCountWithBloom())
dataStream.print()
env.execute("uv with bloom job")
}
}
// 自定义窗口触发器
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
// 每来一条数据,就直接触发窗口操作,并清空所有窗口状态
TriggerResult.FIRE_AND_PURGE
}
}
// 定义一个布隆过滤器
class Bloom(size: Long) extends Serializable {
// 位图的总大小,默认16M
private val cap = if (size > 0) size else 1
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?