在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。
一、订单支付实时监控在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。
1.1、使用CEP实现订单超时检测package com.chb.userbehavioranalysis.order
import java.util
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
// 定义输入订单事件的样例类
case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)
// 定义输出结果样例类
case class OrderResult(orderId: Long, resultMsg: String)
/**
* 订单超时
*/
object OrderTimeout {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 1. 读取订单数据
val resource = getClass.getResource("/OrderLog.csv")
val orderEventStream = env.readTextFile(resource.getPath)
// val orderEventStream = env.socketTextStream("chb1", 8888)
.map(data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.orderId)
// 2. 定义一个匹配模式
val orderPayPattern = Pattern.begin[OrderEvent]("begin").where(_.eventType == "create")
.followedBy("follow").where(_.eventType == "pay") // 宽松紧邻, 可以再create后,对订单进行修改,只要最终支付就可以了
.within(Time.minutes(15))
// 3. 把模式应用到stream上,得到一个pattern stream
val patternStream = CEP.pattern(orderEventStream, orderPayPattern)
// 4. 调用select方法,提取事件序列,超时的事件要做报警提示
val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")
// 使用侧输出流 对异常数据处理。
val resultStream = patternStream.select(orderTimeoutOutputTag,
new OrderTimeoutSelect(),
new OrderPaySelect())
resultStream.print("payed")
resultStream.getSideOutput(orderTimeoutOutputTag).print("timeout")
env.execute("order timeout job")
}
}
// 自定义超时事件序列处理函数
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult] {
override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
val timeoutOrderId = map.get("begin").iterator().next().orderId
OrderResult(timeoutOrderId, "timeout")
}
}
// 自定义正常支付事件序列处理函数
class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult] {
override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
val payedOrderId = map.get("follow").iterator().next().orderId
OrderResult(payedOrderId, "payed successfully")
}
}
1.2、使用状态编程实现订单超时检测
CEP底层使用通过状态机处理不同状态的跳转,所以我们本节通过状态编程,使用定时器,状态器处理。
package com.chb.userbehavioranalysis.order
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object OrderTimeoutWithoutCep {
val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// 读取订单数据
val resource = getClass.getResource("/OrderLog.csv")
// val orderEventStream = env.readTextFile(resource.getPath)
val orderEventStream = env.socketTextStream("chb1", 8888)
.map(data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.orderId)
// 定义process function进行超时检测
// val timeoutWarningStream = orderEventStream.process( new OrderTimeoutWarning() )
val orderResultStream = orderEventStream.process(new OrderPayMatch())
orderResultStream.print("payed")
orderResultStream.getSideOutput(orderTimeoutOutputTag).print("timeout")
env.execute("order timeout without cep job")
}
class OrderPayMatch() extends KeyedProcessFunction[Long, OrderEvent, OrderResult] {
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("ispayed-state", classOf[Boolean]))
// 保存定时器的时间戳为状态
lazy val timerState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-state", classOf[Long]))
override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {
// 先读取状态
val isPayed = isPayedState.value()
val timerTs = timerState.value()
// 根据事件的类型进行分类判断,做不同的处理逻辑
if (value.eventType == "create") {
// 1. 如果是create事件,接下来判断pay是否来过
if (isPayed) {
// 1.1 如果已经pay过,匹配成功,输出主流,清空状态
out.collect(OrderResult(value.orderId, "payed successfully"))
ctx.timerService().deleteEventTimeTimer(timerTs)
isPayedState.clear()
timerState.clear()
} else {
// 1.2 如果没有pay过,注册定时器等待pay的到来
val ts = value.eventTime * 1000L + 15 * 60 * 1000L
ctx.timerService().registerEventTimeTimer(ts)
timerState.update(ts)
}
} else if (value.eventType == "pay") {
// 2. 如果是pay事件,那么判断是否create过,用timer表示
if (timerTs > 0) {
// 2.1 如果有定时器,说明已经有create来过
// 继续判断,是否超过了timeout时间
if (timerTs > value.eventTime * 1000L) {
// 2.1.1 如果定时器时间还没到,那么输出成功匹配
out.collect(OrderResult(value.orderId, "payed successfully"))
} else {
// 2.1.2 如果当前pay的时间已经超时,那么输出到侧输出流
ctx.output(orderTimeoutOutputTag, OrderResult(value.orderId, "payed but already timeout"))
}
// 输出结束,清空状态
ctx.timerService().deleteEventTimeTimer(timerTs)
isPayedState.clear()
timerState.clear()
} else {
// 2.2 pay先到了,更新状态,注册定时器等待create
isPayedState.update(true)
ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L)
timerState.update(value.eventTime * 1000L)
}
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
// 根据状态的值,判断哪个数据没来
if (isPayedState.value()) {
// 如果为true,表示pay先到了,没等到create
ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found create log"))
} else {
// 表示create到了,没等到pay
ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
}
isPayedState.clear()
timerState.clear()
}
}
}
// 实现自定义的处理函数
class OrderTimeoutWarning() extends KeyedProcessFunction[Long, OrderEvent, OrderResult] {
// 保存pay是否来过的状态
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("ispayed-state", classOf[Boolean]))
override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {
// 先取出状态标识位
val isPayed = isPayedState.value()
if (value.eventType == "create" && !isPayed) {
// 如果遇到了create事件,并且pay没有来过,注册定时器开始等待
ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L + 15 * 60 * 1000L)
} else if (value.eventType == "pay") {
// 如果是pay事件,直接把状态改为true
isPayedState.update(true)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
// 判断isPayed是否为true
val isPayed = isPayedState.value()
if (isPayed) {
out.collect(OrderResult(ctx.getCurrentKey, "order payed successfully"))
} else {
out.collect(OrderResult(ctx.getCurrentKey, "order timeout"))
}
// 清空状态
isPayedState.clear()
}
}
二、实时对账
对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做 合 并 处 理 。 这 里 我 们 利 用 connect 将 两 条 流 进 行 连 接 , 然 后 用 自 定 义 的CoProcessFunction 进行处理。
2.1、package com.chb.userbehavioranalysis.order
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
// 定义接收流事件的样例类
case class ReceiptEvent(txId: String, payChannel: String, eventTime: Long)
object TxMacthDetect {
// 定义侧数据流tag
val unmatchedPays = new OutputTag[OrderEvent]("unmatchedPays")
val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatchedReceipts")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 读取订单事件流
val resource = getClass.getResource("/OrderLog.csv")
val orderEventStream = env.readTextFile(resource.getPath)
// val orderEventStream = env.socketTextStream("localhost", 7777)
.map(data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
})
.filter(_.txId != "")
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.txId)
// 读取支付到账事件流
val receiptResource = getClass.getResource("/ReceiptLog.csv")
val receiptEventStream = env.readTextFile(receiptResource.getPath)
// val receiptEventStream = env.socketTextStream("localhost", 8888)
.map(data => {
val dataArray = data.split(",")
ReceiptEvent(dataArray(0).trim, dataArray(1).trim, dataArray(2).toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.txId)
// 将两条流连接起来,共同处理
val processedStream = orderEventStream.connect(receiptEventStream)
.process(new TxPayMatch())
processedStream.print("matched")
processedStream.getSideOutput(unmatchedPays).print("unmatchedPays")
processedStream.getSideOutput(unmatchedReceipts).print("unmatchReceipts")
env.execute("tx match job")
}
class TxPayMatch() extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)] {
// 定义状态来保存已经到达的订单支付事件和到账事件
lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay-state", classOf[OrderEvent]))
lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt-state", classOf[ReceiptEvent]))
// 订单支付事件数据的处理
override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
// 判断有没有对应的到账事件
val receipt = receiptState.value()
if (receipt != null) {
// 如果已经有receipt,在主流输出匹配信息,清空状态
out.collect((pay, receipt))
receiptState.clear()
} else {
// 如果还没到,那么把pay存入状态,并且注册一个定时器等待
payState.update(pay)
ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L + 5000L)
}
}
// 到账事件的处理
override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
// 同样的处理流程
val pay = payState.value()
if (pay != null) {
out.collect((pay, receipt))
payState.clear()
} else {
receiptState.update(receipt)
ctx.timerService().registerEventTimeTimer(receipt.eventTime * 1000L + 5000L)
}
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
// 到时间了,如果还没有收到某个事件,那么输出报警信息
if (payState.value() != null) {
// recipt没来,输出pay到侧输出流
ctx.output(unmatchedPays, payState.value())
}
if (receiptState.value() != null) {
ctx.output(unmatchedReceipts, receiptState.value())
}
payState.clear()
receiptState.clear()
}
}
}