写点什么

大数据培训 | 电商用户行为分析之商业指标统计分析

作者:@零度
  • 2022 年 6 月 22 日
  • 本文字数:5619 字

    阅读完需:约 18 分钟

​模块创建和数据准备


继续在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 MarketAnalysis。


这个模块中我们没有现成的数据,所以会用自定义的测试源来产生测试数据流,或者直接用生成测试数据文件。


APP 市场推广统计


随着智能手机的普及,在如今的电商网站中已经有越来越多的用户来自移动端,相比起传统浏览器的登录方式,手机 APP 成为了更多用户访问电商网站的首选。对于电商企业来说,一般会通过各种不同的渠道对自己的 APP 进行市场推广,而这些渠道的统计数据(比如,不同网站上广告链接的点击量、APP 下载量)就成了市场营销的重要商业指标_大数据培训


首 先 我 们 考 察 分 渠 道 的 市 场 推 广 统 计 。 在 src/main/scala 下 创 建 AppMarketingByChannel.scala 文件。由于没有现成的数据,所以我们需要自定义一个测试源来生成用户行为的事件流。


自定义测试数据源


定义一个源数据的样例类 MarketingUserBehavior,再定义一个 SourceFunction,用于产生用户行为源数据,命名为 SimulatedEventSource:case class MarketingUserBehavior(userId: Long, behavior: String, channel: String,timestamp: Long)


class SimulatedEventSource extends RichParallelSourceFunction[MarketingUserBehavior]


{


var running = true


val channelSet: Seq[String] = Seq("AppStore", "XiaomiStore", "HuaweiStore", "weibo","wechat", "tieba")


val behaviorTypes: Seq[String] = Seq("BROWSE", "CLICK", "PURCHASE", "UNINSTALL")


val rand: Random = Random


override def run(ctx: SourceContext[MarketingUserBehavior]): Unit = {


val maxElements = Long.MaxValue


var count = 0L


while (running && count < maxElements) {


val id = UUID.randomUUID().toString.toLong


val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))


val channel = channelSet(rand.nextInt(channelSet.size))


val ts = System.currentTimeMillis()


ctx.collectWithTimestamp(MarketingUserBehavior(id, behaviorType, channel, ts),


ts)


count += 1


TimeUnit.MILLISECONDS.sleep(5L)


}


}


override def cancel(): Unit = running = false


}


更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网 www.atguigu.com


分渠道统计


另 外 定 义 一 个 窗 口 处 理 的 输 出 结 果 样 例 类 MarketingViewCount , 并 自 定 义 ProcessWindowFunction 进行处理,代码如下:


case class MarketingCountView(windowStart: Long, windowEnd: Long, channel: String,


behavior: String, count: Long)


object AppMarketingByChannel {


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


env.setParallelism(1)


val stream: DataStream[MarketingUserBehavior] = env.addSource(new


SimulatedEventSource)


.assignAscendingTimestamps(_.timestamp)


stream


.filter(_.behavior != "UNINSTALL")


.map(data => {


((data.channel, data.behavior), 1L)


})


.keyBy(_._1)


.timeWindow(Time.hours(1), Time.seconds(1))


.process(new MarketingCountByChannel())


.print()


env.execute(getClass.getSimpleName)


}


}


class MarketingCountByChannel() extends ProcessWindowFunction[((String, String),


Long), MarketingViewCount, (String, String), TimeWindow] {


override def process(key: (String, String),


context: Context,


elements: Iterable[((String, String), Long)],


out: Collector[MarketingViewCount]): Unit = {


val startTs = context.window.getStart


val endTs = context.window.getEnd


val channel = key._1


val behaviorType = key._2


val count = elements.size


out.collect( MarketingViewCount(formatTs(startTs), formatTs(endTs), channel,


behaviorType, count) )


}


private def formatTs (ts: Long) = {


val df = new SimpleDateFormat ("yyyy/MM/dd-HH:mm:ss")


df.format (new Date (ts) )


}


}


不分渠道(总量)统计


同样我们还可以考察不分渠道的市场推广统计,这样得到的就是所有渠道推广的总量。在 src/main/scala 下创建 AppMarketingStatistics.scala 文件,代码如下:


object AppMarketingStatistics {


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


env.setParallelism(1)


val stream: DataStream[MarketingUserBehavior] = env.addSource(new


SimulatedEventSource)


.assignAscendingTimestamps(_.timestamp)


stream


.filter(_.behavior != "UNINSTALL")


.map(data => {


("dummyKey", 1L)


})


.keyBy(_._1)


.timeWindow(Time.hours(1), Time.seconds(1))


.process(new MarketingCountTotal())


.print()


env.execute(getClass.getSimpleName)


}


}


class MarketingCountTotal() extends ProcessWindowFunction[(String, Long),


MarketingViewCount, String, TimeWindow]{


override def process(key: String, context: Context, elements: Iterable[(String, Long)],


out: Collector[MarketingViewCount]): Unit = {


val startTs = context.window.getStart


val endTs = context.window.getEnd


val count = elements.size


out.collect( MarketingViewCount(formatTs(startTs), formatTs(endTs), "total","total", count) )


}


private def formatTs (ts: Long) = {


val df = new SimpleDateFormat ("yyyy/MM/dd-HH:mm:ss")


df.format (new Date (ts) )


}


}


页面广告分析


电商网站的市场营销商业指标中,除了自身的 APP 推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,也是市场营销的重要指标。


对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。


更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网 www.atguigu.com


页面广告点击量统计


接下来我们就进行页面广告按照省份划分的点击量的统计。在 src/main/scala 下创建 AdStatisticsByGeo.scala 文件。同样由于没有现成的数据,我们定义一些测试数据,放在 AdClickLog.csv 中,用来生成用户点击广告行为的事件流。


在代码中我们首先定义源数据的样例类 AdClickLog,以及输出统计数据的样例类 CountByProvince。主函数中先以 province 进行 keyBy,然后开一小时的时间窗口,滑动距离为 5 秒,统计窗口内的点击事件数量。具体代码实现如下:


case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp:Long)


case class CountByProvince(windowEnd: String, province: String, count: Long)


object AdStatisticsByGeo {


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


env.setParallelism(1)


val adLogStream: DataStream[AdClickLog] =


env.readTextFile("YOURPATH\resources\AdClickLog.csv")


.map(data => {


val dataArray = data.split(",")


AdClickLog(dataArray(0).toLong, dataArray(1).toLong, dataArray(2),


dataArray(3), dataArray(4).toLong)


})


.assignAscendingTimestamps(_.timestamp * 1000L)


val adCountStream = adLogStream


.keyBy(_.province)


.timeWindow(Time.minutes(60), Time.seconds(5))


.aggregate(new CountAgg(), new CountResult())


.print()


env.execute("ad statistics job")


}


}


class CountAgg() extends AggregateFunction[AdClickLog, Long, Long]{


override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1L


override def createAccumulator(): Long = 0L


override def getResult(accumulator: Long): Long = accumulator


override def merge(a: Long, b: Long): Long = a + b


}


class CountResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]{


override def apply(key: String, window: TimeWindow, input: Iterable[Long], out:


Collector[CountByProvince]): Unit = {


out.collect(CountByProvince(formatTs(window.getEnd), key, input.iterator.next()))


}


private def formatTs (ts: Long) = {


val df = new SimpleDateFormat ("yyyy/MM/dd-HH:mm:ss")


df.format (new Date (ts) )


}


}


黑名单过滤


上节我们进行的点击量统计,同一用户的重复点击是会叠加计算的。在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如 100 次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计_大数据视频


具体代码实现如下:


case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp:


Long)


case class CountByProvince(windowEnd: String, province: String, count: Long)


case class BlackListWarning(userId: Long, adId: Long, msg: String)


object AdStatisticsByGeo {


val blackListOutputTag = new OutputTagBlackListWarning


def main(args: Array[String]): Unit = {


val env = StreamExecutionEnvironment.getExecutionEnvironment


env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


env.setParallelism(1)


val adLogStream: DataStream[AdClickLog] =


env.readTextFile("D:\Projects\BigData\UserBehaviorAnalysis\MarketAnalysis\src\


\main\resources\AdClickLog.csv")


.map(data => {


val dataArray = data.split(",")


AdClickLog(dataArray(0).toLong, dataArray(1).toLong, dataArray(2),


dataArray(3), dataArray(4).toLong)


})


.assignAscendingTimestamps(_.timestamp * 1000L)


val filterBlackListStream = adLogStream


.keyBy(logData => (logData.userId, logData.adId))


.process(new FilterBlackListUser(100))


val adCountStream = filterBlackListStream


.keyBy(_.province)


.timeWindow(Time.minutes(60), Time.seconds(5))


.aggregate(new countAgg(), new countResult())


.print()


filterBlackListStream.getSideOutput(blackListOutputTag)


.print("black list")


env.execute("ad statistics job")


}


class FilterBlackListUser(maxCount: Long) extends KeyedProcessFunction[(Long, Long),


AdClickLog, AdClickLog] {


// 保存当前用户对当前广告的点击量


lazy val countState: ValueState[Long] = getRuntimeContext.getState(new


ValueStateDescriptor[Long]("count-state", classOf[Long]))


// 标记当前(用户,广告)作为 key 是否第一次发送到黑名单


lazy val firstSent: ValueState[Boolean] = getRuntimeContext.getState(new


ValueStateDescriptor[Boolean]("firstsent-state", classOf[Boolean]))


// 保存定时器触发的时间戳,届时清空重置状态


lazy val resetTime: ValueState[Long] = getRuntimeContext.getState(new


ValueStateDescriptor[Long]("resettime-state", classOf[Long]))


override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(Long,


Long), AdClickLog, AdClickLog]#Context, out: Collector[AdClickLog]): Unit = {


val curCount = countState.value()


// 如果是第一次处理,注册一个定时器,每天 00:00 触发清除


if( curCount == 0 ){


val ts = (ctx.timerService().currentProcessingTime() / (246060*1000) + 1) *


(246060*1000)


resetTime.update(ts)


ctx.timerService().registerProcessingTimeTimer(ts)


}


// 如果计数已经超过上限,则加入黑名单,用侧输出流输出报警信息


if( curCount > maxCount ){


if( !firstSent.value() ){


firstSent.update(true)


ctx.output( blackListOutputTag, BlackListWarning(value.userId, value.adId,


"Click over " + maxCount + "times today.") )


}


return


}


更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网 www.atguigu.com


// 点击计数加 1


countState.update(curCount + 1)


out.collect( value )


}


override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long),


AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {


if( timestamp == resetTime.value() ){


firstSent.clear()


countState.clear()


}


}


}


}


class countAgg() extends AggregateFunction[AdClickLog, Long, Long] {


override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1L


override def createAccumulator(): Long = 0L


override def getResult(accumulator: Long): Long = accumulator


override def merge(a: Long, b: Long): Long = a + b


}


class countResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]


{


override def apply(key: String, window: TimeWindow, input: Iterable[Long], out:


Collector[CountByProvince]): Unit = {


out.collect(CountByProvince(formatTs(window.getEnd), key, input.iterator.next()))


}


private def formatTs(ts: Long) = {


val df = new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss")


df.format(new Date(ts))


}


}


用户头像

@零度

关注

关注尚硅谷,轻松学IT 2021.11.23 加入

IT培训 www.atguigu.com

评论

发布
暂无评论
大数据培训 | 电商用户行为分析之商业指标统计分析_大数据开发_@零度_InfoQ写作社区