第二届 Apache Flink 极客挑战赛冠军比赛攻略 _SkyPeaceLL 队
关联比赛: 第二届 Apache Flink极客挑战赛
赛题介绍
一、病例行动数据集
病例历史行动数据集(训练集 1) 1M+
确诊病例数据 (测试集 1) 500+
实时病例行动数据集(测试集 2) 1000+
二、天猫精灵行为数据集
天猫精灵历史行为数据集(训练集 2) 1M+
用户行为数据集(测试集 3) 500+
实时用户行为数据集(测试集 4) 1000+
四个任务
根据测试集 1 每条数据的特征向量,在训练集 1 中找出该病例(人)对应的所有记录。
对测试集 2 的每条数据,根据其特征向量进行实时分类(人)。
根据测试集 3 每条数据的特征向量,在训练集 2 中找出该用户行为(领域+意图)对应的所有记录。
对测试集 4 的每条数据,根据其特征向量进行实时分类(领域+意图)。
性能要求
Job 总运行时间不能超过 3 小时。
对每条实时数据完成实时分类的响应时间不能超过 500ms。
平台和组件
Flink,PyFlink,Flink ai_flow,达摩院 Proxima,Intel Zoo cluster serving
解决方案 - Workflow
解决方案 - Workflow config file
本方案中,代码框架设定了一个目标:对于新增的相似应用的数据集,不修改 python 代码,只需新增一个 workflow config file(yaml 文件),根据新数据集的基本属性以及数据结构的特性设置相应的配置即可。/code/package/wf_config_1.yaml --病例行动数据集对应的配置文件
/code/package/wf_config_2.yaml --天猫精灵行为数据集对应的配置文件
解决方案 - Data pre-processing
病例行动数据集
不含异常数据,且特征向量已经 L2 Normalization,不需要特别的预处理。
天猫精灵行为数据集
存在一些异常数据,需要做以下预处理:
移除某些特征向量数据末尾多出的空格(注:如果不做相应处理,score3 通常得 0 分)
Re-generate UUID for duplicated UUID
Processing zero vector
Processing duplicated vector
L2 Normalization
解决方案 - Model training
Model
Simple AutoEncoder (实测效果好,稳定,性能好,采用)
Deep AutoEncoder(实测效果好,性能一般,最终未采用)
VAE (Variational AutoEncoder) (实测效果相对较差,未采用)
PCA (Principal Component Analysis) (实测效果相对较差,未采用)
NMF (Non-negative matrix factorization) (实测效果相对较差,未采用)
模型关键参数
损失函数:MSE
激活函数:linear
降维的维度选择
病例行动数据集:512=>128
天猫精灵行为数据集:700=>128
解决方案 - Inference
Intel Zoo Cluster Serving
支持 Tensorflow Saved Model 以及 PyTorch Model for Inference
支持并发 Inference(本赛题设置为 16 个并发),在多并发下运行稳定
模型针对 CPU 做了优化,无需 GPU 环境
自动生成配置,方便部署
响应时间短。平均每个请求响应时间实测小于 35ms,充分满足本方案中的性能需求。
向量索引和向量检索
阿里达摩院 proxima
使用 Proxima HnswBuilder 创建索引,使用 HnswSearch search vector
支持海量数据向量检索
召回率高,Top100 召回率超过 98.5%
检索性能高,在本赛题中,平均每个请求(TopK=1024)的响应时间小于 3ms,完全满足 TopK 筛选+再聚类这样类型的应用需求,对于实时的向量检索也毫无压力。
解决方案 - 聚类算法
针对历史行动数据的聚类
根据指定要 search 的 vector,Topk=1024,通过 Proxima search 出 1024 个 UUIDs 只是作为初步筛选的 UUIDs。再使用 Pandas 查出它们对应的 vectors,然后将这些 vectors 和指定 search 的那个 vector 合并为 1025 个 vectors。
使用聚类算法 Chinese_Whispers,对上述 1025 个 vectors 进行聚类分组后,取出和指定 vector 属于相同组的所有 vectors 所对应的 UUIDs 输出。Chinese_Whispers 算法对于病例行动数据集效果最好。
另外尝试了 K-Means,发现基本不可行。尝试了 DBSCAN 聚类算法,可行,但效果不如 Chinese Whispers。
针对天猫精灵的数据集,还尝试了一种简单算法:Topk=128,从 search 出的 128 个 UUIDs 中根据 result.score(),设置一个阈值,将 result.score()<阈值的 UUIDs 全部输出。该实际评分效果要略微优于 Chinese_Whispers。
其它一些常见的聚类算法以及较新的 GCN(Graph Convolutional Network),因时间关系,计划赛后继续尝试。
针对实时行动数据的实时分类
根据指定的 vector,search Top1 UUID。
直接使用 Top1 UUID 作为分类 label 输出。(注:用此分类方法+合适的模型+online data 处理不超时,score2 可得满分 500 分)
聚类算法 - Chinese_Whispers
算法流程
1 初始化:将所有的样本点初始化为不同的类。
2 建图:构建无向图,以每个节点为一个类别,不同节点之间计算相似度,当相似度超过 threshold,将两个节点相连形成关联边,边的权重为相似度。
3 迭代:
3.1 随机选取一个节点 i 开始,在其相连节点中选取边权重最大者 j,并将 i 归为节点 j 类(若相连节点中有多个节点属于同一类,则将这些权重相加再做比较)
3.2 遍历所有节点后,重复迭代至满足迭代次数。
解决方案 – 解决 Online Data (Kafka) 超时的问题
方法一、使用 ai_flow 内建的算子
1 使用 ai_flow.read_example、ai_flow.predict、ai_flow.transform 和 ai_flow.write_example
2 在其中的 SourceExecutor/SinkExecutor 实现类中使用 PyFlink TABLE API(For Kafka) 读/写 Kafka Topic
3 为相应 Flink job 的 StreamExecutionEnvironment 设置参数:stream_env.enable_checkpointing(250)
该参数默认为 3000ms,3000ms 会导致每 3 秒才集中从 Kafka Topic 中读出 6 条数据。所以,如果不设置这个参数,必定会导致每 6 条数据中平均有 5 条会超时 500ms,使得实时数据(score2 和 score4)得分很难超过 100 分(满分 500 分),因此必须改变这个参数设置。针对本赛题,可以设置为 250ms。
方法一在产线上应用没什么问题,但是在本比赛中它有一个小问题,那就是初始会有 8 秒延迟,这个延迟会使得赛题程序开始发送的约 16 条数据被 TABLE API(For Kafka)读到时都会超时 500ms,从而对最终评分有所影响(实测大概影响 6 分左右)。
使用方法一可确保只会有少量的初始数据(实测 16 条左右)产生超时。
方法二、使用 ai_flow 的用户自定义算子
1 ai_flow 支持更为灵活的用户自定义算子 af.user_define_operation
2 在用户自定义算子的 Executor 实现类中,直接使用 Kafka Consumer/Producer 读写 Kafka Topic
3 直接通过 Kafka consumer 从 Kafka Topic 读取数据,然后 call Inference API (by Zoo cluster serving) 降维,然后使用 Proxima search API search Top1 UUID,然后得出分类 label,最后直接通过 Kafka Producer 将结果数据写入 Kafka Topic。
使用方法二可避免初始 16 条数据的超时问题,设置好关键参数 (如 fetch_max_wait_ms=200),可确保所有数据都不会超时。
总结和感想
本次比赛是算法+工程化问题。既要设计好算法,又要考虑实际工程需要。
模型算法:如果数据集的特征向量已经经过很好的处理,那么降维模型的模型可选择 MSE loss 损失小且 Inference 性能高的模型即可。而聚类算法要根据数据集特征向量的特性选择合适的算法。向量检索:对于海量数据,必须使用专门的向量检索组件。阿里 proxima 提供了高召回率且极短的响应时间。并行 Inference:如果服务器只有 CPU 环境,那么使用 Intel Zoo Cluster Serving 为 Inference 提供并行服务目前是非常好的选择。
实时数据处理:充分利用 Zoo Cluster Serving、Proxima 的性能优势以及 ai_flow user defined operation 的灵活优势,将实时数据处理效果最佳化。
代码框架:针对相似应用的新数据集,只需给出相应的新配置文件即可,无需改动 python code。
工程考虑:代码不仅考虑比赛得分效果,同时也考虑了通过配置的方式在不同应用场景下使用不同的实现。生产价值:一些基于向量检索的应用具有相似性,在这样的思路和不断改进下,它们应该可以泛化成通用的应用架构和代码框架,最终或许也可以实现为某一类软件产品或平台。另外,Zoo Cluster Serving 以及 Proxima 在无昂贵 GPU 仅有 CPU 的环境下,提供了高并发及高性能的特性。因此,充分使用了 Zoo Cluster Serving 以及 Proxima 的解决方案在一些实际生产系统的最终方案选择中,将具备很强的竞争力。
查看更多内容,欢迎访问天池技术圈官方地址:第二届Apache Flink极客挑战赛冠军比赛攻略_SkyPeaceLL队_天池技术圈-阿里云天池
评论