大数据培训 | 电商用户行为分析之商业指标统计分析
模块创建和数据准备
继续在 UserBehaviorAnalysis 下新建一个 maven module 作为子项目,命名为 MarketAnalysis。
这个模块中我们没有现成的数据,所以会用自定义的测试源来产生测试数据流,或者直接用生成测试数据文件。
APP 市场推广统计
随着智能手机的普及,在如今的电商网站中已经有越来越多的用户来自移动端,相比起传统浏览器的登录方式,手机 APP 成为了更多用户访问电商网站的首选。对于电商企业来说,一般会通过各种不同的渠道对自己的 APP 进行市场推广,而这些渠道的统计数据(比如,不同网站上广告链接的点击量、APP 下载量)就成了市场营销的重要商业指标_大数据培训。
首 先 我 们 考 察 分 渠 道 的 市 场 推 广 统 计 。 在 src/main/scala 下 创 建 AppMarketingByChannel.scala 文件。由于没有现成的数据,所以我们需要自定义一个测试源来生成用户行为的事件流。
自定义测试数据源
定义一个源数据的样例类 MarketingUserBehavior,再定义一个 SourceFunction,用于产生用户行为源数据,命名为 SimulatedEventSource:case class MarketingUserBehavior(userId: Long, behavior: String, channel: String,timestamp: Long)
class SimulatedEventSource extends RichParallelSourceFunction[MarketingUserBehavior]
{
var running = true
val channelSet: Seq[String] = Seq("AppStore", "XiaomiStore", "HuaweiStore", "weibo","wechat", "tieba")
val behaviorTypes: Seq[String] = Seq("BROWSE", "CLICK", "PURCHASE", "UNINSTALL")
val rand: Random = Random
override def run(ctx: SourceContext[MarketingUserBehavior]): Unit = {
val maxElements = Long.MaxValue
var count = 0L
while (running && count < maxElements) {
val id = UUID.randomUUID().toString.toLong
val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
val channel = channelSet(rand.nextInt(channelSet.size))
val ts = System.currentTimeMillis()
ctx.collectWithTimestamp(MarketingUserBehavior(id, behaviorType, channel, ts),
ts)
count += 1
TimeUnit.MILLISECONDS.sleep(5L)
}
}
override def cancel(): Unit = running = false
}
更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网 www.atguigu.com
分渠道统计
另 外 定 义 一 个 窗 口 处 理 的 输 出 结 果 样 例 类 MarketingViewCount , 并 自 定 义 ProcessWindowFunction 进行处理,代码如下:
case class MarketingCountView(windowStart: Long, windowEnd: Long, channel: String,
behavior: String, count: Long)
object AppMarketingByChannel {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream: DataStream[MarketingUserBehavior] = env.addSource(new
SimulatedEventSource)
.assignAscendingTimestamps(_.timestamp)
stream
.filter(_.behavior != "UNINSTALL")
.map(data => {
((data.channel, data.behavior), 1L)
})
.keyBy(_._1)
.timeWindow(Time.hours(1), Time.seconds(1))
.process(new MarketingCountByChannel())
.print()
env.execute(getClass.getSimpleName)
}
}
class MarketingCountByChannel() extends ProcessWindowFunction[((String, String),
Long), MarketingViewCount, (String, String), TimeWindow] {
override def process(key: (String, String),
context: Context,
elements: Iterable[((String, String), Long)],
out: Collector[MarketingViewCount]): Unit = {
val startTs = context.window.getStart
val endTs = context.window.getEnd
val channel = key._1
val behaviorType = key._2
val count = elements.size
out.collect( MarketingViewCount(formatTs(startTs), formatTs(endTs), channel,
behaviorType, count) )
}
private def formatTs (ts: Long) = {
val df = new SimpleDateFormat ("yyyy/MM/dd-HH:mm:ss")
df.format (new Date (ts) )
}
}
不分渠道(总量)统计
同样我们还可以考察不分渠道的市场推广统计,这样得到的就是所有渠道推广的总量。在 src/main/scala 下创建 AppMarketingStatistics.scala 文件,代码如下:
object AppMarketingStatistics {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream: DataStream[MarketingUserBehavior] = env.addSource(new
SimulatedEventSource)
.assignAscendingTimestamps(_.timestamp)
stream
.filter(_.behavior != "UNINSTALL")
.map(data => {
("dummyKey", 1L)
})
.keyBy(_._1)
.timeWindow(Time.hours(1), Time.seconds(1))
.process(new MarketingCountTotal())
.print()
env.execute(getClass.getSimpleName)
}
}
class MarketingCountTotal() extends ProcessWindowFunction[(String, Long),
MarketingViewCount, String, TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String, Long)],
out: Collector[MarketingViewCount]): Unit = {
val startTs = context.window.getStart
val endTs = context.window.getEnd
val count = elements.size
out.collect( MarketingViewCount(formatTs(startTs), formatTs(endTs), "total","total", count) )
}
private def formatTs (ts: Long) = {
val df = new SimpleDateFormat ("yyyy/MM/dd-HH:mm:ss")
df.format (new Date (ts) )
}
}
页面广告分析
电商网站的市场营销商业指标中,除了自身的 APP 推广,还会考虑到页面上的广告投放(包括自己经营的产品和其它网站的广告)。所以广告相关的统计分析,也是市场营销的重要指标。
对于广告的统计,最简单也最重要的就是页面广告的点击量,网站往往需要根据广告点击量来制定定价策略和调整推广方式,而且也可以借此收集用户的偏好信息。更加具体的应用是,我们可以根据用户的地理位置进行划分,从而总结出不同省份用户对不同广告的偏好,这样更有助于广告的精准投放。
更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网 www.atguigu.com
页面广告点击量统计
接下来我们就进行页面广告按照省份划分的点击量的统计。在 src/main/scala 下创建 AdStatisticsByGeo.scala 文件。同样由于没有现成的数据,我们定义一些测试数据,放在 AdClickLog.csv 中,用来生成用户点击广告行为的事件流。
在代码中我们首先定义源数据的样例类 AdClickLog,以及输出统计数据的样例类 CountByProvince。主函数中先以 province 进行 keyBy,然后开一小时的时间窗口,滑动距离为 5 秒,统计窗口内的点击事件数量。具体代码实现如下:
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp:Long)
case class CountByProvince(windowEnd: String, province: String, count: Long)
object AdStatisticsByGeo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val adLogStream: DataStream[AdClickLog] =
env.readTextFile("YOURPATH\resources\AdClickLog.csv")
.map(data => {
val dataArray = data.split(",")
AdClickLog(dataArray(0).toLong, dataArray(1).toLong, dataArray(2),
dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val adCountStream = adLogStream
.keyBy(_.province)
.timeWindow(Time.minutes(60), Time.seconds(5))
.aggregate(new CountAgg(), new CountResult())
.print()
env.execute("ad statistics job")
}
}
class CountAgg() extends AggregateFunction[AdClickLog, Long, Long]{
override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1L
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class CountResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out:
Collector[CountByProvince]): Unit = {
out.collect(CountByProvince(formatTs(window.getEnd), key, input.iterator.next()))
}
private def formatTs (ts: Long) = {
val df = new SimpleDateFormat ("yyyy/MM/dd-HH:mm:ss")
df.format (new Date (ts) )
}
}
黑名单过滤
上节我们进行的点击量统计,同一用户的重复点击是会叠加计算的。在实际场景中,同一用户确实可能反复点开同一个广告,这也说明了用户对广告更大的兴趣;但是如果用户在一段时间非常频繁地点击广告,这显然不是一个正常行为,有刷点击量的嫌疑。所以我们可以对一段时间内(比如一天内)的用户点击行为进行约束,如果对同一个广告点击超过一定限额(比如 100 次),应该把该用户加入黑名单并报警,此后其点击行为不应该再统计_大数据视频。
具体代码实现如下:
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp:
Long)
case class CountByProvince(windowEnd: String, province: String, count: Long)
case class BlackListWarning(userId: Long, adId: Long, msg: String)
object AdStatisticsByGeo {
val blackListOutputTag = new OutputTagBlackListWarning
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val adLogStream: DataStream[AdClickLog] =
env.readTextFile("D:\Projects\BigData\UserBehaviorAnalysis\MarketAnalysis\src\
\main\resources\AdClickLog.csv")
.map(data => {
val dataArray = data.split(",")
AdClickLog(dataArray(0).toLong, dataArray(1).toLong, dataArray(2),
dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val filterBlackListStream = adLogStream
.keyBy(logData => (logData.userId, logData.adId))
.process(new FilterBlackListUser(100))
val adCountStream = filterBlackListStream
.keyBy(_.province)
.timeWindow(Time.minutes(60), Time.seconds(5))
.aggregate(new countAgg(), new countResult())
.print()
filterBlackListStream.getSideOutput(blackListOutputTag)
.print("black list")
env.execute("ad statistics job")
}
class FilterBlackListUser(maxCount: Long) extends KeyedProcessFunction[(Long, Long),
AdClickLog, AdClickLog] {
// 保存当前用户对当前广告的点击量
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new
ValueStateDescriptor[Long]("count-state", classOf[Long]))
// 标记当前(用户,广告)作为 key 是否第一次发送到黑名单
lazy val firstSent: ValueState[Boolean] = getRuntimeContext.getState(new
ValueStateDescriptor[Boolean]("firstsent-state", classOf[Boolean]))
// 保存定时器触发的时间戳,届时清空重置状态
lazy val resetTime: ValueState[Long] = getRuntimeContext.getState(new
ValueStateDescriptor[Long]("resettime-state", classOf[Long]))
override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(Long,
Long), AdClickLog, AdClickLog]#Context, out: Collector[AdClickLog]): Unit = {
val curCount = countState.value()
// 如果是第一次处理,注册一个定时器,每天 00:00 触发清除
if( curCount == 0 ){
val ts = (ctx.timerService().currentProcessingTime() / (246060*1000) + 1) *
(246060*1000)
resetTime.update(ts)
ctx.timerService().registerProcessingTimeTimer(ts)
}
// 如果计数已经超过上限,则加入黑名单,用侧输出流输出报警信息
if( curCount > maxCount ){
if( !firstSent.value() ){
firstSent.update(true)
ctx.output( blackListOutputTag, BlackListWarning(value.userId, value.adId,
"Click over " + maxCount + "times today.") )
}
return
}
更多 Java –大数据 – 前端 – UI/UE - Android - 人工智能资料下载,可访问百度:尚硅谷官网 www.atguigu.com
// 点击计数加 1
countState.update(curCount + 1)
out.collect( value )
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long),
AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {
if( timestamp == resetTime.value() ){
firstSent.clear()
countState.clear()
}
}
}
}
class countAgg() extends AggregateFunction[AdClickLog, Long, Long] {
override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1L
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class countResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]
{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out:
Collector[CountByProvince]): Unit = {
out.collect(CountByProvince(formatTs(window.getEnd), key, input.iterator.next()))
}
private def formatTs(ts: Long) = {
val df = new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss")
df.format(new Date(ts))
}
}
评论