写点什么

etl engine 实现 redis 与 mysql 之间的数据同步

作者:weigeonlyyou
  • 2024-06-28
    吉林
  • 本文字数:3243 字

    阅读完需:约 11 分钟

etl engine 实现 redis与mysql之间的数据同步

etl engine 实现 redis 与 mysql 之间的数据同步

Redis 是一个开源的使用 C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,因其读取速度快、也可用于消息队列使用等场景,已经成为项目中不可缺少的一部分。本案例是通过 etl engine 实现 redis 与 mysql 之间的数据同步。

需求

读 redis 写 mysql; 读 mysql 写 redis

前置条件

事先准备一个可读写 redis 服务器;一个可读写 mysql 服务器;读 redis 的 key 写到 mysql 的 t_redis_info 表;读 mysql 的 t_redis_info 表记录写到 redis


  • MySQL 模拟数据



CREATE TABLE t_redis_info ( id VARCHAR(32) NOT NULL, caption VARCHAR(50), tag VARCHAR(50), memo VARCHAR(100), writetime VARCHAR(19), PRIMARY KEY (id));INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('1','herbin_beer_550','啤酒','哈尔滨雪花550ML','2023-01-01 11:12:13');INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('2','qingdao_beer_550','啤酒','青岛纯生550ML','2023-01-02 01:02:03');INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('3','qingdao_beer_330','啤酒','青岛干啤330ML','2023-02-03 01:02:03');INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('4','herbin_beer_330','啤酒','哈尔滨勇闯天涯330ML','2023-02-03 01:02:03');INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('5','budweiser_beer_330','啤酒','美国百威330ML','2023-03-04 01:02:03');INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('6','wahaha_water_600','纯净水','娃哈哈600ML','2023-03-04 01:02:03');INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('7','nongfushanquan_water_600','纯净水','农夫山泉600ML','2023-03-05 01:02:03');
复制代码

配置模型图

配置文件内容

<?xml version="1.0" encoding="UTF-8"?><Graph runMode="1">
<Node id="DB_INPUT_TABLE_1" type="DB_INPUT_TABLE" fetchSize="1000" dbConnection="CONNECT_1" desc="读数据表" > <Script name="sqlScript"> <![CDATA[ SELECT caption AS k ,CONCAT(id,'_',caption,'_',memo,'_', tag) AS v FROM t_redis_info]]> </Script> </Node> <Node id="DB_OUTPUT_TABLE_1" type="DB_OUTPUT_TABLE" dbConnection="CONNECT_1" outputFields="id;caption;memo;tag;writetime" renameOutputFields="id;caption;memo;tag;writetime" desc="写数据表" > <Script name="sqlScript"> <![CDATA[INSERT INTO t_redis_info (id,caption,memo,tag,writetime) VALUES(?,?,?,?,?);]]> </Script> <BeforeOut> <![CDATA[package extimport ( "errors" "fmt" "strconv" "strings" "time" "github.com/tidwall/gjson" "github.com/tidwall/sjson" "etl-engine/etl/tool/extlibs/common")func RunScript(dataValue string) (result string, topErr error) { newRows := "" rows := gjson.Get(dataValue, "rows") for index, row := range rows.Array() { //增加一个字段名称为id的列 tmpStr, _ := sjson.Set(row.String(), "id", common.GetUUID() ) //将系统默认输出的value字段拆分,并创建多个字段 values := gjson.Get(row.String(),"value").String() vArr := strings.Split(values, ";") caption := vArr[1] memo := vArr[2] tag := vArr[3] tmpStr, _ = sjson.Set(tmpStr, "caption", caption ) tmpStr, _ = sjson.Set(tmpStr, "memo", memo ) tmpStr, _ = sjson.Set(tmpStr, "tag", tag ) tmpStr, _ = sjson.Set(tmpStr, "writetime", time.Now().Format("2006-01-02 15:04:05")) common.GetLogger().Infoln("新行数据结构tmpStr:",tmpStr) newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpStr) } return newRows, nil}]]> </BeforeOut> </Node> <Node id="REDIS_WRITER_1" type="REDIS_WRITER" nameServer="127.0.0.1:16379" password="******" db="1" isGetTTL="false" patternMatchKey="true" outputFields="k;v" renameOutputFields="key;value" desc="写redis" /> <Node id="REDIS_READER_1" type="REDIS_READER" nameServer="127.0.0.1:16379" password="******" db="1" isGetTTL="false" patternMatchKey="true" keys="*" desc="读redis" /> <Line from="DB_INPUT_TABLE_1" to="REDIS_WRITER_1" type="STANDARD" order="0" metadata="METADATA_1" id="LINE_STANDARD_1"/> <Line from="REDIS_READER_1" to="DB_OUTPUT_TABLE_1" type="STANDARD" order="1" metadata="METADATA_2" id="LINE_STANDARD_2"/> <Metadata id="METADATA_2" > <Field name="id" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/> <Field name="caption" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/> <Field name="memo" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/> <Field name="tag" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/> <Field name="writetime" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/> </Metadata> <Metadata id="METADATA_1" > <Field name="key" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/> <Field name="value" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/> </Metadata> <Connection id="CONNECT_1" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******" /></Graph>
复制代码

总结主要配置环节

  1. 配置串行执行任务

Graph 标签中 设置 runMode="1" ,

使下面两个任务流可以按 order 配置的顺序执行。

  1. 画两个任务流

两个连接线中 order 属性分别设置 0 和 1,任务执行行先执行 order 为 0 的任务,再执行 order 为 1 的任务。

第 1 个任务流(读 mysql -> 写 redis)

第 2 个任务流(读 redis -> 写 mysql)

  1. 第 1 个任务流

  • 读数据表节点设置

script 属性

SELECT caption AS k ,CONCAT(id,'',caption,'',memo,'_', tag) AS v FROM t_redis_info;

caption 为 redis 中的键名称,组合的 v 为 redis 中的键值内容.

  • 写 redis 节点设置

patternMatchKey="true"

outputFields 设置 k;v

renameOutputFields 设置 key;value

系统默认会为 redis 的输出数据流生成 key 和 value 两个字段的数据结构

  • 创建元数据

METADATA_0 结构是两个字段 key 和 value 连接线中 order 属性设置 0 ,元数据选择 METADATA_0

该元数据用于写 redis 节点输出数据流时使用。

  1. 第 2 个任务流

  • 读 redis 节点设置

patternMatchKey="true"

keys="*"

  • 写数据表节点设置

script 属性

INSERT INTO t_redis_info (id,caption,memo,tag,writetime) VALUES(?,?,?,?,?);

outputFields 设置 id;caption;memo;tag;writetime

注意,通过嵌入 go 脚本来重新处理输入数据流中的各字段,因此 outputFields 中设置的字段名称要跟脚本中创建的字段名称相符

renameOutputFields 设置 id;caption;memo;tag;writetime

注意 outputFields 和 renameOutputFields 字段个数保持一致

  • 嵌入 go 脚本,增加一个字段名称为 id,调用了内置函数生成 uuid

BeforeOut 标签中嵌入 go 脚本,目的是将输入数据流结构转换成目标表中的各字段结构。

  • 创建元数据

METADATA_1 结构是 5 个字段 id,caption,memo,tag,writetime

连接线中 order 属性设置 1 ,元数据选择 METADATA_1

该元数据用于写数据表节点输出数据流时使用。

用户头像

weigeonlyyou

关注

还未添加个人签名 2022-12-26 加入

还未添加个人简介

评论

发布
暂无评论
etl engine 实现 redis与mysql之间的数据同步_Go_weigeonlyyou_InfoQ写作社区