OpenMLDB + OneFlow: 手把手教你快速链接特征工程到模型训练
本文整理自 OpenMLDB Meetup No.5 中 OpenMLDB PMC 黄威的演讲,将以 京东高潜用户购买意向预测问题 为例,示范如何使用 OpenMLDB 和 OneFlow 联合来打造一个完整的机器学习应用。分享视频如下:
导读:
如何从历史数据中找出规律,去预测用户未来的购买需求,让最合适的商品遇见最需要的人,是大数据应用在精准营销中的关键问题,也是所有电商平台在做智能化升级时所需要的核心技术。京东作为中国最大的自营式电商,沉淀了数亿的忠实用户,积累了海量的真实数据。本案例以京东商城真实的用户、商品和行为数据(脱敏后)为基础,通过数据挖掘的技术和机器学习的算法,构建用户购买商品的预测模型,输出高潜用户和目标商品的匹配结果,为精准营销提供高质量的目标群体,挖掘数据背后潜在的意义,为电商用户提供更简单、快捷、省心的购物体验。本案例使用 OpenMLDB 进行数据挖掘,使用 OneFlow 中的 DeepFM 模型进行高性能训练推理,提供精准的商品推荐。
本案例基于 OpenMLDB 集群版进行教程演示。注意,本文档使用的是预编译好的 docker 镜像。如果希望在自己编译和搭建的 OpenMLDB 环境下进行测试,推荐针对 OpenMLDB 优化的 Spark 发行版。
(参考章节:https://openmldb.ai/docs/zh/main/deploy/compile.html )
环境准备和预备知识
OneFlow 工具包安装
OneFlow 工具依赖 GPU 的强大算力,所以请确保部署机器具备 Nvidia GPU,并且保证驱动版本 >=460.X.X 驱动版本需支持 CUDA 11.0。使用一下指令安装 OneFlow:
还需要安装以下 Python 工具包:
拉取和启动 OpenMLDB Docker 镜像
注意,请确保 Docker Engine 版本号 >= 18.03
拉取 OpenMLDB docker 镜像,并且运行相应容器
映射 demo 文件夹至/root/project,这里我们使用的路径为 demodir=/home/gtest/demo
上述镜像预装了 OpenMLDB 的工具等,我们需要进一步安装 Oneflow 推理所需依赖。
因为我们将在 OpenMLDB 的服务中嵌入 OneFlow 模型推理的预处理及调用,需要安装以下的依赖。
注意,本教程以下的 OpenMLDB 部分的演示命令默认均在该已经启动的 docker 容器内运行。OneFlow 命令默认在 1.1 安装的 OneFlow 环境下运行。
初始化环境
我们在镜像内提供了 init.sh 脚本帮助用户快速初始化环境,包括:
配置 zookeeper
启动集群版 OpenMLDB
启动 OpenMLDB CLI 客户端
注意,本教程大部分命令在 OpenMLDB CLI 下执行,为了跟普通 shell 环境做区分,在 OpenMLDB CLI 下执行的命令均使用特殊的提示符 >
。
预备知识:集群版的非阻塞任务
集群版的部分命令是非阻塞任务,包括在线模式的 LOAD DATA,以及离线模式的 LOAD DATA ,SELECT,SELECT INTO 命令。提交任务以后可以使用相关的命令如 SHOW JOBS, SHOW JOB 来查看任务进度,详情参见离线任务管理文档。
机器学习训练流程
流程概览
使用 OpenMLDB+OneFlow 进行机器学习训练可总结为以下大致步骤。
接下来会介绍每一个步骤的具体操作细节。
使用 OpenMLDB 进行离线特征抽取
创建数据库和数据表 ✦
以下命令均在 OpenMLDB CLI 环境下执行。
CREATE DATABASE JD_db;USE JD_db;CREATE TABLE action(reqId string, eventTime timestamp, ingestionTime timestamp, actionValue int);CREATE TABLE flattenRequest(reqId string, eventTime timestamp, main_id string, pair_id string, user_id string, sku_id string, time bigint, split_id int, time1 string);CREATE TABLE bo_user(ingestionTime timestamp, user_id string, age string, sex string, user_lv_cd string, user_reg_tm bigint);CREATE TABLE bo_action(ingestionTime timestamp, pair_id string, time bigint, model_id string, type string, cate string, br string);CREATE TABLE bo_product(ingestionTime timestamp, sku_id string, a1 string, a2 string, a3 string, cate string, br string);CREATE TABLE bo_comment(ingestionTime timestamp, dt bigint, sku_id string, comment_num int, has_bad_comment string, bad_comment_rate float);
也可使用 sql 脚本(/root/project/create_tables.sql)运行:
离线数据准备 ✦
首先,切换到离线执行模式。接着,导入数据作为离线数据,用于离线特征计算。
以下命令均在 OpenMLDB CLI 下执行。
USE JD_db;SET @@execute_mode='offline';LOAD DATA INFILE '/root/project/data/JD_data/action/.parquet' INTO TABLE action options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/flattenRequest_clean/.parquet' INTO TABLE flattenRequest options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/bo_user/.parquet' INTO TABLE bo_user options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/bo_action/.parquet' INTO TABLE bo_action options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/bo_product/.parquet' INTO TABLE bo_product options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/bo_comment/.parquet' INTO TABLE bo_comment options(format='parquet', header=true, mode='append');
或使用脚本执行:
并通过以下命令快速查询 jobs 状态:
注意,集群版 LOAD DATA
为非阻塞任务,可以使用命令 SHOW JOBS
查看任务运行状态,请等待任务运行成功( state
转至 FINISHED
状态),再进行下一步操作 。
特征设计 ✦
通常在设计特征前,用户需要根据机器学习的目标对数据进行分析,然后根据分析设计和调研特征。机器学习的数据分析和特征研究不是本文讨论的范畴,我们将不作展开。本文假定用户具备机器学习的基本理论知识,有解决机器学习问题的能力,能够理解 SQL 语法,并能够使用 SQL 语法构建特征。针对本案例,用户经过分析和调研设计了若干特征。请注意,在实际的机器学习特征调研过程中,科学家对特征进行反复试验,寻求模型效果最好的特征集。所以会不断的重复多次特征设计->离线特征抽取->模型训练过程,并不断调整特征以达到预期效果。
离线特征抽取 ✦
用户在离线模式下,进行特征抽取,并将特征结果输出到'/root/project/out/1'目录下保存(对应映射为 $demodir/out/1),以供后续的模型训练。 SELECT 命令对应了基于上述特征设计所产生的 SQL 特征计算脚本。以下命令均在 OpenMLDB CLI 下执行。
此处仅一个命令,可以使用阻塞式 LOAD DATA,直接运行 sql 脚本 sync_select_out.sql:
注意,集群版 LOAD DATA
为非阻塞任务,可以使用命令 SHOW JOBS
查看任务运行状态,请等待任务运行成功( state
转至 FINISHED
状态),再进行下一步操作 。
预处理数据集以配合 DeepFM 模型要求
注意,以下命令在 docker 外执行,使用安装了 1.1 所描述的 OneFlow 运行环境根据 DeepFM 论文, 类别特征和连续特征都被当作稀疏特征对待。
进入 demo 文件夹,运行以下指令进行数据处理
对应生成 parquet 数据集将生成在 $demodir/openmldb_process/out。数据信息将被打印如下,该信息将被输入为训练的配置文件。
启动 OneFlow 进行模型训练
注意,以下命令在安装 1.1 所描述的 OneFlow 运行环境中运行
修改对应 train-deepfm.sh 配置文件 ✦
开始模型训练 ✦
模型上线流程
流程概览
使用 OpenMLDB+OneFlow 进行模型 serving 可总结为以下大致步骤。接下来会介绍每一个步骤的具体操作细节。
配置 OpenMLDB 进行在线特征抽取
特征抽取 SQL 脚本上线 ✦
假定 2.4 节中所设计的特征在上一步的模型训练中产出的模型符合预期,那么下一步就是将该特征抽取 SQL 脚本部署到线上去,以提供在线的特征抽取。
重新启动 OpenMLDB CLI,以进行 SQL 上线部署
执行上线部署,以下命令在 OpenMLDB CLI 内执行。
USE JD_db;SET @@execute_mode='online';deploy demo select * from(select
reqId
as reqId_1,eventTime
as flattenRequest_eventTime_original_0,reqId
as flattenRequest_reqId_original_1,pair_id
as flattenRequest_pair_id_original_24,sku_id
as flattenRequest_sku_id_original_25,user_id
as flattenRequest_user_id_original_26,distinct_count(pair_id
) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_unique_count_27,fz_top1_ratio(pair_id
) over flattenRequest_user_id_eventTime_0_10_ as flattenRequest_pair_id_window_top1_ratio_28,fz_top1_ratio(pair_id
) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_top1_ratio_29,distinct_count(pair_id
) over flattenRequest_user_id_eventTime_0s_14d_200 as flattenRequest_pair_id_window_unique_count_32,case when !isnull(at(pair_id
, 0)) over flattenRequest_user_id_eventTime_0_10_ then count_where(pair_id
,pair_id
= at(pair_id
, 0)) over flattenRequest_user_id_eventTime_0_10_ else null end as flattenRequest_pair_id_window_count_35,dayofweek(timestamp(eventTime
)) as flattenRequest_eventTime_dayofweek_41,case when 1 < dayofweek(timestamp(eventTime
)) and dayofweek(timestamp(eventTime
)) < 7 then 1 else 0 end as flattenRequest_eventTime_isweekday_43fromflattenRequest
window flattenRequest_user_id_eventTime_0_10_ as (partition byuser_id
order byeventTime
rows between 10 preceding and 0 preceding),flattenRequest_user_id_eventTime_0s_14d_200 as (partition byuser_id
order byeventTime
rows_range between 14d preceding and 0s preceding MAXSIZE 200))as out0last join(selectflattenRequest
.reqId
as reqId_3,action_reqId
.actionValue
as action_actionValue_multi_direct_2,bo_product_sku_id
.a1
as bo_product_a1_multi_direct_3,bo_product_sku_id
.a2
as bo_product_a2_multi_direct_4,bo_product_sku_id
.a3
as bo_product_a3_multi_direct_5,bo_product_sku_id
.br
as bo_product_br_multi_direct_6,bo_product_sku_id
.cate
as bo_product_cate_multi_direct_7,bo_product_sku_id
.ingestionTime
as bo_product_ingestionTime_multi_direct_8,bo_user_user_id
.age
as bo_user_age_multi_direct_9,bo_user_user_id
.ingestionTime
as bo_user_ingestionTime_multi_direct_10,bo_user_user_id
.sex
as bo_user_sex_multi_direct_11,bo_user_user_id
.user_lv_cd
as bo_user_user_lv_cd_multi_direct_12fromflattenRequest
last joinaction
asaction_reqId
onflattenRequest
.reqId
=action_reqId
.reqId
last joinbo_product
asbo_product_sku_id
onflattenRequest
.sku_id
=bo_product_sku_id
.sku_id
last joinbo_user
asbo_user_user_id
onflattenRequest
.user_id
=bo_user_user_id
.user_id
)as out1on out0.reqId_1 = out1.reqId_3last join(selectreqId
as reqId_14,max(bad_comment_rate
) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_max_13,min(bad_comment_rate
) over bo_comment_sku_id_ingestionTime_0_10_ as bo_comment_bad_comment_rate_multi_min_14,min(bad_comment_rate
) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_bad_comment_rate_multi_min_15,distinct_count(comment_num
) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_unique_count_22,distinct_count(has_bad_comment
) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_unique_count_23,fz_topn_frequency(has_bad_comment
, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_has_bad_comment_multi_top3frequency_30,fz_topn_frequency(comment_num
, 3) over bo_comment_sku_id_ingestionTime_0s_64d_100 as bo_comment_comment_num_multi_top3frequency_33from(selecteventTime
asingestionTime
, bigint(0) asdt
,sku_id
assku_id
, int(0) ascomment_num
, '' ashas_bad_comment
, float(0) asbad_comment_rate
, reqId fromflattenRequest
)window bo_comment_sku_id_ingestionTime_0s_64d_100 as (UNION (selectingestionTime
,dt
,sku_id
,comment_num
,has_bad_comment
,bad_comment_rate
, '' as reqId frombo_comment
) partition bysku_id
order byingestionTime
rows_range between 64d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),bo_comment_sku_id_ingestionTime_0_10_ as (UNION (selectingestionTime
,dt
,sku_id
,comment_num
,has_bad_comment
,bad_comment_rate
, '' as reqId frombo_comment
) partition bysku_id
order byingestionTime
rows between 10 preceding and 0 preceding INSTANCE_NOT_IN_WINDOW))as out2on out0.reqId_1 = out2.reqId_14last join(selectreqId
as reqId_17,fz_topn_frequency(br
, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_br_multi_top3frequency_16,fz_topn_frequency(cate
, 3) over bo_action_pair_id_ingestionTime_0s_10h_100 as bo_action_cate_multi_top3frequency_17,fz_topn_frequency(model_id
, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_top3frequency_18,distinct_count(model_id
) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_model_id_multi_unique_count_19,distinct_count(model_id
) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_model_id_multi_unique_count_20,distinct_count(type
) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_unique_count_21,fz_topn_frequency(type
, 3) over bo_action_pair_id_ingestionTime_0s_7d_100 as bo_action_type_multi_top3frequency_40,fz_topn_frequency(type
, 3) over bo_action_pair_id_ingestionTime_0s_14d_100 as bo_action_type_multi_top3frequency_42from(selecteventTime
asingestionTime
,pair_id
aspair_id
, bigint(0) astime
, '' asmodel_id
, '' astype
, '' ascate
, '' asbr
, reqId fromflattenRequest
)window bo_action_pair_id_ingestionTime_0s_10h_100 as (UNION (selectingestionTime
,pair_id
,time
,model_id
,type
,cate
,br
, '' as reqId frombo_action
) partition bypair_id
order byingestionTime
rows_range between 10h preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),bo_action_pair_id_ingestionTime_0s_7d_100 as (UNION (selectingestionTime
,pair_id
,time
,model_id
,type
,cate
,br
, '' as reqId frombo_action
) partition bypair_id
order byingestionTime
rows_range between 7d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW),bo_action_pair_id_ingestionTime_0s_14d_100 as (UNION (selectingestionTime
,pair_id
,time
,model_id
,type
,cate
,br
, '' as reqId frombo_action
) partition bypair_id
order byingestionTime
rows_range between 14d preceding and 0s preceding MAXSIZE 100 INSTANCE_NOT_IN_WINDOW))as out3on out0.reqId_1 = out3.reqId_17;
可使用如下命令确认 deploy 信息
在线数据准备 ✦
首先,请切换到在线执行模式。接着在在线模式下,导入数据作为在线数据,用于在线特征计算。以下命令均在 OpenMLDB CLI 下执行。
USE JD_db;SET @@execute_mode='online';LOAD DATA INFILE '/root/project/data/JD_data/action/.parquet' INTO TABLE action options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/flattenRequest_remove_last/.parquet' INTO TABLE flattenRequest options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/bo_user/.parquet' INTO TABLE bo_user options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/bo_action/.parquet' INTO TABLE bo_action options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/bo_product/.parquet' INTO TABLE bo_product options(format='parquet', header=true, mode='append');LOAD DATA INFILE '/root/project/data/JD_data/bo_comment/.parquet' INTO TABLE bo_comment options(format='parquet', header=true, mode='append');
配置 OneFlow 推理服务
Oneflow 的推理服务需要 One Embedding 的支持。该支持目前还没有合入主框架中。若需要重新编译,可参考附录 A 进行编译测试。接下来步骤默认相关支持已编译完成,并且存放在/home/gtest/work/oneflow_serving/路径中。
离线特征检查模型路径($demodir/oneflow_process/model)中模型文件及组织方式是否正确 ✦
3.3.1
确认 config.pbtxt 中的配置正确 ✦
确认 persistent 路径($demodir/oneflow_process/persistent)正确 ✦
启动推理服务
启动 OneFlow 推理服务 ✦
使用一下命令启动 OneFlow 推理服务:
启动在线推理服务 demo ✦
OpenMLDB 的在线特征计算服务已通过 SQL 上线完成,OneFlow 推理服务也已经启动。这个 demo 将串联两者,在收到实时请求后,访问 OpenMLDB 进行特征抽取,再访问 OneFlow 推理服务,进行在线推理,最后返回推理结果。
如果尚未退出 OpenMLDB CLI,请使用 quit 命令退出 OpenMLDB CLI。
在普通命令行下启动预估服务:
发送预估请求
预估请求可在 OpenMLDB 的容器外执行。容器外部访问的具体信息可参见 IP 配置。在普通命令行下执行内置的 predict.py 脚本。该脚本发送一行请求数据到预估服务,接收返回的预估结果,并打印出来。
范例输出:
附录– OneFlow 定制代码编译
此章节介绍为 One Embedding 推理服务所定制的代码修改的编译过程。该代码会尽快合入 OneFlow 中,届时以下步骤可省略。
容器环境准备
使用以下容器环境进行编译。该容器已经安装编译所需的依赖等。
启动容器并映射相关路径(此处可映射/home/work/gtest 至/root/project)。接下来的操作均在容器内进行。
编译 OneFlow
编译 Serving
上述命令中的/path/to/liboneflow_cpp/share 要替换成上边编译的 oneflow 的里面的路径,在{oneflow 路径}/build/liboneflow_cpp/share
测试 TritonServer
复制 backend 库文件:
在命令行启动 TritonServer 测试:
在另一个命令行运行如下指令,若成功,输出示例如下:
python $demodir/serving/client.py
0.045439958572387695(1, 1)[[b'0.025343:0']]
注意:
如果出现 libunwind.so.8 未找到需要用-v /lib/x86_64-linux-gnu:/unwind_path 映射一下 libunwind.so.8 所在目录,然后添加到 LD_LIBRARY_PATH 里面: ... bash -c 'LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mylib:/unwind_path ...
如果出现 libcupti.so 未找到需要用-v /usr/local/cuda-11.7/extras/CUPTI/lib64:/cupti_path 映射一下 libcupti.so 所在目录,然后添加到 LD_LIBRARY_PATH 里面: ... bash -c 'LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/mylib:/cupti_path ..., 其中具体的 cuda 的路径按实际安装的位置,可以用 ldd {oneflow 路径}/build/liboneflow.so | grep cupti 来找到
希望本文能够帮大家快速理解掌握如何使用 OpenMLDB 和 OneFlow 联合来打造一个完整的机器学习应用,链接特征工程到模型训练的全流程。
如果想进一步了解 OpenMLDB 或者参与社区技术交流,可以通过以下渠道获得相关信息和互动~
Github: https://github.com/4paradigm/OpenMLDB
Email: contact@openmldb.ai
OpenMLDB 微信交流群:
评论