写点什么

大数据培训 | 电商用户行为分析之订单支付实时监控

作者:@零度
  • 2022 年 6 月 23 日
  • 本文字数:4596 字

    阅读完需:约 15 分钟

​在电商网站中,订单的支付作为直接与营销收入挂钩的一环,在业务流程中非常重要。对于订单而言,为了正确控制业务流程,也为了增加用户的支付意愿,网站一般会设置一个支付失效时间,超过一段时间不支付的订单就会被取消。另外,对于订单的支付,我们还应保证用户支付的正确性,这可以通过第三方支付平台的交易数据来做一个实时对账。在接下来的内容中,我们将实现这两个需求。


模块创建和数据准备


同样地,在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 OrderTimeoutDetect。在这个子模块中,我们同样将会用到 flink 的 CEP 库来实现事件流的模式匹配,所以需要在 pom 文件中引入 CEP 的相关依赖:


<dependency>


<groupId>org.apache.flink</groupId>


<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>


<version>${flink.version}</version>


</dependency>


同样,在 src/main/目录下,将默认源文件目录 java 改名为 scala。


更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)


代码实现


在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。


使用 CEP 实现


我们首先还是利用 CEP 库来实现这个功能。我们先将事件流按照订单号 orderId 分流,然后定义这样的一个事件模式:在 15 分钟内,事件“create”与“pay”非严格紧邻:


val orderPayPattern = Pattern.beginOrderEvent


.where(_.eventType == "create")


.followedBy("follow")


.where(_.eventType == "pay")


.within(Time.seconds(5))


这样调用.select 方法时,就可以同时获取到匹配出的事件和超时未匹配的事件了。


在 src/main/scala 下继续创建 OrderTimeout.scala 文件,新建一个单例对象。定义样例类 OrderEvent,这是输入的订单事件流;另外还有 OrderResult,这是输出显示 的 订 单 状 态 结 果 。 订 单 数 据 也 本 应 该 从 UserBehavior 日 志 里 提 取 , 由 于 UserBehavior.csv 中没有做相关埋点,我们从另一个文件 OrderLog.csv 中读取登录数据_大数据培训


完整代码如下:


OrderTimeoutDetect/src/main/scala/OrderTimeout.scala


case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)


case class OrderResult(orderId: Long, eventType: String)


object OrderTimeout {


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setParallelism(1)


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


val orderEventStream = env.readTextFile("YOUR_PATH\resources\OrderLog.csv")


.map( data => {


val dataArray = data.split(",")


OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)


})


.assignAscendingTimestamps(_.eventTime * 1000)


// 定义一个带匹配时间窗口的模式


val orderPayPattern = Pattern.beginOrderEvent


.where(_.eventType == "create")


.followedBy("follow")


.where(_.eventType == "pay")


.within(Time.minutes(15))


// 定义一个输出标签


val orderTimeoutOutput = OutputTagOrderResult


// 订单事件流根据 orderId 分流,然后在每一条流中匹配出定义好的模式


val patternStream = CEP.pattern(orderEventStream.keyBy("orderId"), orderPayPattern)


val completedResult = patternStream.select(orderTimeoutOutput) {


// 对于已超时的部分模式匹配的事件序列,会调用这个函数


(pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {


val createOrder = pattern.get("begin")


OrderResult(createOrder.get.iterator.next().orderId, "timeout")


}


} {


// 检测到定义好的模式序列时,就会调用这个函数


pattern: Map[String, Iterable[OrderEvent]] => {


val payOrder = pattern.get("follow")


OrderResult(payOrder.get.iterator.next().orderId, "success")


}


}


// 拿到同一输出标签中的 timeout 匹配结果(流)


val timeoutResult = completedResult.getSideOutput(orderTimeoutOutput)


completedResult.print()


timeoutResult.print()


env.execute("Order Timeout Detect Job")


}


}


使用 Process Function 实现


我们同样可以利用 Process Function,自定义实现检测订单超时的功能。为了简化问题,我们只考虑超时报警的情形,在 pay 事件超时未发生的情况下,输出超时报警信息。


一个简单的思路是,可以在订单的 create 事件到来后注册定时器,15 分钟后触发;然后再用一个布尔类型的 Value 状态来作为标识位,表明 pay 事件是否发生过。如果 pay 事件已经发生,状态被置为 true,那么就不再需要做什么操作;而如果 pay 事件一直没来,状态一直为 false,到定时器触发时,就应该输出超时报警信息_大数据视频


具体代码实现如下:


OrderTimeoutDetect/src/main/scala/OrderTimeoutWithoutCep.scala


object OrderTimeoutWithoutCep {


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


env.setParallelism(1)


val orderEventStream = env.readTextFile("YOUR_PATH\resources\OrderLog.csv")


.map( data => {


val dataArray = data.split(",")


OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)


})


更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)


.assignAscendingTimestamps(_.eventTime * 1000)


.keyBy(_.orderId)


// 自定义一个 process function,进行 order 的超时检测,输出超时报警信息


val timeoutWarningStream = orderEventStream


.process(new OrderTimeoutAlert)


timeoutWarningStream.print()


env.execute()


}


class OrderTimeoutAlert extends KeyedProcessFunction[Long, OrderEvent, OrderResult]


{


lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new


ValueStateDescriptor[Boolean]("ispayed-state", classOf[Boolean]))


override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long,


OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {


val isPayed = isPayedState.value()


if (value.eventType == "create" && !isPayed) {


ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L + 15 * 60 *


1000L)


} else if (value.eventType == "pay") {


isPayedState.update(true)


}


}


override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent,


OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {


val isPayed = isPayedState.value()


if (!isPayed) {


out.collect(OrderResult(ctx.getCurrentKey, "order timeout"))


}


isPayedState.clear()


}


}


}


来自两条流的订单交易匹配


对于订单支付事件,用户支付完成其实并不算完,我们还得确认平台账户上是否到账了。而往往这会来自不同的日志信息,所以我们要同时读入两条流的数据来做 合 并 处 理 。 这 里 我 们 利 用 connect 将 两 条 流 进 行 连 接 , 然 后 用 自 定 义 的 CoProcessFunction 进行处理。


具体代码如下:


TxMatchDetect/src/main/scala/TxMatch


case class OrderEvent( orderId: Long, eventType: String, txId: String, eventTime: Long )


case class ReceiptEvent( txId: String, payChannel: String, eventTime: Long )


object TxMatch {


val unmatchedPays = new OutputTagOrderEvent


val unmatchedReceipts = new OutputTagReceiptEvent


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setParallelism(1)


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


val orderEventStream = env.readTextFile("YOUR_PATH\resources\OrderLog.csv")


.map( data => {


val dataArray = data.split(",")


OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2),


dataArray(3).toLong)


})


.filter(_.txId != "")


.assignAscendingTimestamps(_.eventTime * 1000L)


.keyBy(_.txId)


val receiptEventStream = env.readTextFile("YOUR_PATH\resources\ReceiptLog.csv")


.map( data => {


val dataArray = data.split(",")


ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)


})


.assignAscendingTimestamps(_.eventTime * 1000L)


.keyBy(_.txId)


val processedStream = orderEventStream


.connect(receiptEventStream)


.process(new TxMatchDetection)


processedStream.getSideOutput(unmatchedPays).print("unmatched pays")


processedStream.getSideOutput(unmatchedReceipts).print("unmatched receipts")


processedStream.print("processed")


env.execute()


}


class TxMatchDetection extends CoProcessFunction[OrderEvent, ReceiptEvent,


(OrderEvent, ReceiptEvent)]{


lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new


ValueStateDescriptorOrderEvent )


lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new


ValueStateDescriptor[ReceiptEvent]("receipt-state", classOf[ReceiptEvent]) )


override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent,


ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent,


ReceiptEvent)]): Unit = {


val receipt = receiptState.value()


if( receipt != null ){


receiptState.clear()


out.collect((pay, receipt))


} else{


payState.update(pay)


ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L)


}


}


override def processElement2(receipt: ReceiptEvent, ctx:


CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out:


Collector[(OrderEvent, ReceiptEvent)]): Unit = {


val payment = payState.value()


if( payment != null ){


payState.clear()


out.collect((payment, receipt))


} else{


receiptState.update(receipt)


ctx.timerService().registerEventTimeTimer(receipt.eventTime * 1000L)


}


}


更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网(www.atguigu.com)


override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent,


ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent,


ReceiptEvent)]): Unit = {


if ( payState.value() != null ){


ctx.output(unmatchedPays, payState.value())


}


if ( receiptState.value() != null ){


ctx.output(unmatchedReceipts, receiptState.value())


}


payState.clear()


receiptState.clear()


}


}


}


用户头像

@零度

关注

关注尚硅谷,轻松学IT 2021.11.23 加入

IT培训 www.atguigu.com

评论

发布
暂无评论
大数据培训 | 电商用户行为分析之订单支付实时监控_大数据_@零度_InfoQ写作社区