您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

实时流量统计

宝哥大数据 发布时间:2020-07-09 18:37:10 ,浏览量:0

一、需求分析 1.1、基本需求
  • 从web服务器的日志中,统计实时的访问流量
  • 统计每分钟的ip访问量,取出访问量最大的5个地址,每5秒更新一次
1.1.1、解决思路
  • 将 apache 服务器日志中的时间,转换为时间戳,作为 Event Time
  • 构建滑动窗口,窗口长度为1分钟,滑动距离为5秒
1.2、模块实现

  我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从web服务器的日志中提取出来。我们在这里实现最基本的“页面浏览数”的统计,也就是读取服务器日志中的每一行log,统计在一段时间内用户访问url的次数。   具体步骤为:每隔5秒,输出最近10分钟内访问量最多的前N个URL。可以看出,这个需求与之前“实时热门商品统计”非常类似

package com.chb.userbehavioranalysis.traffic


import java.sql.Timestamp
import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer


// 输入数据格式
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)

// 输出数据格式
case class UrlViewCount(url: String, windowEnd: Long, count: Long)

/**
 * 实时流量统计
 */
object NetworkTraffic {

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        val stream = env
            .readTextFile(getClass.getResource("/apache.log").getPath)
            .map(line => {
                val linearray = line.split(" ")
                // 定义时间转换模板将时间转成时间戳
                val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
                val timestamp = simpleDateFormat.parse(linearray(3)).getTime
                ApacheLogEvent(linearray(0), linearray(1), timestamp, linearray(5), linearray(6))
            })
            // 乱序数据处理,创建时间戳和水位
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(10)) {
                override def extractTimestamp(t: ApacheLogEvent): Long = {
                    t.eventTime
                }
            })
            .filter(_.method == "GET")
            .keyBy(_.url)
            .timeWindow(Time.minutes(1), Time.seconds(5))
            .aggregate(new CountAgg(), new WindowResultFunction())
            .keyBy(_.windowEnd)
            .process(new TopNHotUrls(5))
            .print()

        env.execute("Network Traffic Analysis Job")
    }

    class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
        override def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1

        override def createAccumulator(): Long = 0L

        override def getResult(accumulator: Long): Long = accumulator

        override def merge(a: Long, b: Long): Long = a + b
    }

    class WindowResultFunction extends WindowFunction[Long, UrlViewCount, String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
            val url: String = key
            val count = input.iterator.next()
            out.collect(UrlViewCount(url, window.getEnd, count))
        }
    }

    // 自定义process function,统计访问量最大的url,排序输出
    class TopNHotUrls(topSize: Int) extends KeyedProcessFunction[Long, UrlViewCount, String] {

        // 直接定义状态变量,懒加载
        lazy val urlState: ListState[UrlViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[UrlViewCount]("urlState", classOf[UrlViewCount]))

        override def processElement(i: UrlViewCount, context: KeyedProcessFunction[Long, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {
            // 把每条数据保存到状态中
            urlState.add(i)
            // 注册一个定时器,windowEnd + 10秒 时触发
            context.timerService().registerEventTimeTimer(i.windowEnd + 10 * 1000)
        }

        // 实现ontimer
        override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
            // 从状态中获取所有的Url访问量
            val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
            import scala.collection.JavaConversions._
            for (urlView  ("pv", 1))
            .keyBy(_._1)
            .timeWindow(Time.hours(1))
            .sum(1)

        dataStream.print("pv count")

        env.execute("page view jpb")
    }
}

三、UniqueVisitor 独立访客记录数
package com.chb.userbehavioranalysis.traffic

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


case class UvCount(windowEnd: Long, uvCount: Long)

/**
 * 独立访客记录数
 */
object UniqueVisitor {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        // 用相对路径定义数据源
        val resource = getClass.getResource("/UserBehavior.csv")
        val dataStream = env.readTextFile(resource.getPath)
            .map(data => {
                val dataArray = data.split(",")
                UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
            })
            .assignAscendingTimestamps(_.timestamp * 1000L)
            .filter(_.behavior == "pv") // 只统计pv操作
            .timeWindowAll(Time.hours(1))
            .apply(new UvCountByWindow())

        dataStream.print()
        env.execute("uv job")
    }
}

class UvCountByWindow() extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] {
    override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
        // 定义一个scala set,用于保存所有的数据userId并去重
        var idSet = Set[Long]()
        // 把当前窗口所有数据的ID收集到set中,最后输出set的大小
        for (userBehavior  {
                val dataArray = data.split(",")
                UserBehavior(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim.toInt, dataArray(3).trim, dataArray(4).trim.toLong)
            })
            .assignAscendingTimestamps(_.timestamp * 1000L)
            .filter(_.behavior == "pv") // 只统计pv操作
            .map(data => ("dummyKey", data.userId))
            .keyBy(_._1)
            .timeWindow(Time.hours(1))
            .trigger(new MyTrigger())
            .process(new UvCountWithBloom())

        dataStream.print()

        env.execute("uv with bloom job")
    }
}

// 自定义窗口触发器
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
    override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE

    override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}

    override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
        // 每来一条数据,就直接触发窗口操作,并清空所有窗口状态
        TriggerResult.FIRE_AND_PURGE
    }
}

// 定义一个布隆过滤器
class Bloom(size: Long) extends Serializable {
    // 位图的总大小,默认16M
    private val cap = if (size > 0) size else 1             
关注
打赏
1587549273
查看更多评论
0.0417s