写点什么

flink 维表查询 redis 之 flink-connector-redis

作者:山里小龙
  • 2022 年 4 月 05 日
  • 本文字数:2554 字

    阅读完需:约 8 分钟

flink维表查询redis之flink-connector-redis

项目介绍

基于https://github.com/apache/bahir-flink.git 二次开发,相对 bahir 调整的内容有:支持 flink1.13,删除过期 Flink API、增加 Table/SQL API、 增加维表查询、增加查询缓存、统一使用过期策略、增加写入并发数等。


因 bahir 使用的 flink 接口版本较老,所以改动较大,开发过程中参考了腾讯云与阿里云两家产商的流计算产品,取两家之长,并增加了更丰富的功能,包括更多的 redis 操作命令和更多的 redis 服务类型,如:simple、sentinel、cluster。


支持 redis 的操作命令


插件地址:https://github.com/jeff-zou/flink-connector-redis.git

无法翻墙:https://gitee.com/jeff-zou/flink-connector-redis.git


使用方法:

命令行执行 mvn package -DskipTests 打包后,将生成的包 flink-connector-redis-1.0.10.jar 引入 flink lib 中即可,无需其它设置。


开发环境工程直接引用:

<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.0.10</version>
</dependency>
复制代码


使用说明

无需通过 primary key 来映射 redis 中的 Key,直接由 ddl 中的字段顺序来决定 Key,如:

create table sink_redis(username VARCHAR, passport VARCHAR)  with ('command'='set') 
其中username为key, passport为value.
create table sink_redis(name VARCHAR, subject VARCHAR, score VARCHAR) with ('command'='hset')
其中name为map结构的key, subject为field, score为value.
复制代码


with 参数说明

集群类型为 sentinel 时额外连接参数


数据类型转换



使用示例

维表查询

create table sink_redis(name varchar, level varchar, age varchar) with ( 'connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single','password'='******','command'='hset');
-- 先在redis中插入数据,相当于redis命令: hset 3 3 100 --
insert into sink_redis select * from (values ('3', '3', '100'));

create table dim_table (name varchar, level varchar, age varchar) with ('connector'='redis', 'host'='10.11.80.147','port'='7001', 'redis-mode'='single', 'password'='*****','command'='hget', 'maxIdle'='2', 'minIdle'='1', 'lookup.cache.max-rows'='10', 'lookup.cache.ttl'='10', 'lookup.max-retries'='3');

-- 随机生成10以内的数据作为数据源 --
-- 其中有一条数据会是: username = 3 level = 3, 会跟上面插入的数据关联 --
create table source_table (username varchar, level varchar, proctime as procTime()) with ('connector'='datagen', 'rows-per-second'='1', 'fields.username.kind'='sequence', 'fields.username.start'='1', 'fields.username.end'='10', 'fields.level.kind'='sequence', 'fields.level.start'='1', 'fields.level.end'='10');
create table sink_table(username varchar, level varchar,age varchar) with ('connector'='print');
insert into
sink_table
select
s.username,
s.level,
d.age
from
source_table s
left join dim_table for system_time as of s.proctime as d on
d.name = s.username
and d.level = s.level;
-- username为3那一行会关联到redis内的值,输出为: 3,3,100

复制代码


DataStream 查询方式

示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.datastream.DataStreamTest.java

hset 示例,相当于 redis 命令:*hset tom math 150*

Configuration configuration = new Configuration();
configuration.setString(REDIS_MODE, REDIS_CLUSTER);
configuration.setString(REDIS_COMMAND, RedisCommand.HSET.name());
RedisSinkMapper redisMapper = (RedisSinkMapper)RedisHandlerServices
.findRedisHandler(RedisMapperHandler.class, configuration.toMap())
.createRedisMapper(configuration);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
GenericRowData genericRowData = new GenericRowData(3);
genericRowData.setField(0, "tom");
genericRowData.setField(1, "math");
genericRowData.setField(2, "152");
DataStream<GenericRowData> dataStream = env.fromElements(genericRowData, genericRowData);
RedisCacheOptions redisCacheOptions = new RedisCacheOptions.Builder().setCacheMaxSize(100).setCacheTTL(10L).build();
FlinkJedisConfigBase conf = getLocalRedisClusterConfig();
RedisSinkFunction redisSinkFunction = new RedisSinkFunction<>(conf, redisMapper, redisCacheOptions);
dataStream.addSink(redisSinkFunction).setParallelism(1);
env.execute("RedisSinkTest");

复制代码

redis-cluster 写入示例

示例代码路径: src/test/java/org.apache.flink.streaming.connectors.redis.table.SQLTest.java<br>

set 示例,相当于 redis 命令: *set test test11*

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);
String ddl = "create table sink_redis(username VARCHAR, passport VARCHAR) with ( 'connector'='redis', " +
"'cluster-nodes'='10.11.80.147:7000,10.11.80.147:7001','redis- mode'='cluster','password'='******','command'='set')" ;
tEnv.executeSql(ddl);
String sql = " insert into sink_redis select * from (values ('test', 'test11'))";
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get()
.getJobExecutionResult()
.get();
复制代码


开发与测试环境

ide: IntelliJ IDEA 
code format: google-java-format + Save Actions
code check: CheckStyle
flink 1.13
jdk1.8
如果需要flink 1.12版本支持,请切换到分支flink-1.12
复制代码


问题反馈

关注公众号“肌肉码农”,回复“好友”反馈问题。


用户头像

山里小龙

关注

还未添加个人签名 2018.11.10 加入

还未添加个人简介

评论

发布
暂无评论
flink维表查询redis之flink-connector-redis_山里小龙_InfoQ写作平台