写点什么

Flink+ice 实现可视化规则配置 (Demo)

作者:waitmoon
  • 2022 年 8 月 25 日
    浙江
  • 本文字数:2608 字

    阅读完需:约 9 分钟

ice 文档站:waitmoon.com/zh

1 Demo 仓库地址:

github:https://github.com/zjn-zjn/flink-ice

gitee:https://gitee.com/waitmoon/flink-ice

2 Demo 功能描述

  • 通过 netcat 制造输入流(nc -l 9000 windows:nc -l -p 9000)

  • flink 接收本地 9000 端口输入流,以回车(\n)分割单词

  • 输入流经过 IceProcessor 处理后打印结果流

3 项目搭建

使用 flink-quickstart-java 快速搭建 flink 项目

3.1 添加 ice 依赖

因 flink 为非 Spring 项目,需依赖 ice-core 并手动初始化,Spring 项目直接依赖 ice-client-spring-boot-starter 即可。

<dependency>   <groupId>com.waitmoon.ice</groupId>   <artifactId>ice-core</artifactId>   <version>${ice.version}</version></dependency>
复制代码
3.2 编写 StreamingJob
public class StreamingJob {    public static void main(String[] args) throws Exception {        // 创建 Flink 执行环境        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //接收本地socket9000端口输入流,以回车分割单词        //通过netcat制造输入流 nc -l 9000 (windows nc -l -p 9000)        DataStreamSource<String> stream = env.socketTextStream("localhost", 9000, "\n");        //按照单词长度keyBy,使用IceProcessor并打印结果        stream.keyBy(String::length).process(new IceProcessor()).print().setParallelism(1);        //执行程序        env.execute("Flink Streaming Java API Skeleton");    }}
复制代码
3.3 编写 ice 算子 IceProcessor

在 static 代码块中初始化 ice 客户端,此处直接使用的自己部署的 ice-server 地址对应的 app:2

算子功能:将流内数据放入 roam,组装 pack 并执行 ice 规则处理(直接根据 iceId 触发,iceId 在 server 配置后台获取)

/** * ice算子 */public class IceProcessor extends KeyedProcessFunction<Integer, String, String> {    //ice 客户端    private static IceNioClient iceNioClient;    static {        //初始化ice客户端        try {            //配置远程server地址,app,以及节点扫描路径            //此处使用了自己搭建的server,后台地址 http://eg.waitmoon.com/config/list/2            iceNioClient = new IceNioClient(2, "waitmoon.com:18121", "com.waitmoon.flink.ice.node");            //启动ice客户端            iceNioClient.start();        } catch (Exception e) {            throw new RuntimeException(e);        }    }    @Override    public void processElement(String value, Context ctx, Collector<String> out) {        //组装IcePack        IcePack pack = new IcePack();        //设置要触发的iceId(配置后台中需要触发的ID)        //http://eg.waitmoon.com/config/detail/2/1081        pack.setIceId(1081);        //初始化roam,将单词和长度放入roam中        IceRoam roam = new IceRoam();        roam.put("input", value);        roam.put("length", ctx.getCurrentKey());        pack.setRoam(roam);        //同步执行        Ice.syncProcess(pack);        //执行完成后,获取roam中的result        String result = roam.getMulti("result");        if (result != null) {            //result不为空,将结果放入下游算子            out.collect(result);        }    }    @Override    public void close() {        if (iceNioClient != null) {            //清理ice 客户端            iceNioClient.destroy();            iceNioClient = null;        }    }}
复制代码
3.4 编写节点 ContainsFlow

节点功能:判断根据 key 去 roam 里拿的值是否在 set 中,是则返回 true,否则返回 false

/** * @author waitmoon * 过滤性质节点 * 判断值在不在集合中 */@Data@Slf4j@EqualsAndHashCode(callSuper = true)public class ContainsFlow extends BaseLeafRoamFlow {    //默认input    private String key = "input";    private Set<String> set;    @Override    protected boolean doRoamFlow(IceRoam roam) {        //判断roam中的key对应的值是否在集合中        return set.contains(roam.<String>getMulti(key));    }    @Override    public void afterPropertiesSet() {        log.info("ContainsFlow init with key:{}, set:{} nodeId:{}", key, set, this.getIceNodeId());    }    public NodeRunStateEnum errorHandle(IceContext ctx, Throwable t) {        log.error("error occur id:{} e:", this.findIceNodeId(), t);        return super.errorHandle(ctx, t);    }}
复制代码
3.5 编写节点 PutNone

节点功能:将 value 值放入 roam 的 key 中,不干扰流程(不返回 true/false)

/** * @author waitmoon * 不干扰流程性质节点 * 将一个值放入roam */@Data@EqualsAndHashCode(callSuper = true)public class PutNone extends BaseLeafRoamNone {    //默认result    private String key = "result";
private Object value;
@Override protected void doRoamNone(IceRoam roam) { //将value放到roam中 roam.putMulti(key, value); }}
复制代码

4 项目启动

4.1 netcat 制造输入流

mac/linux 使用 nc -l 9000 命令,windows 使用 nc -l -p 9000 命令 制造一个 socket 输入流

4.2 运行 StreamingJob

运行时可以看到 ice 客户端启动相关信息

5 编排 ice 规则

在 ice-server 后台编辑 ice 规则,用的是自己部署的 ice-server,地址:http://eg.waitmoon.com/config

5.1 新增 app
5.2 新增 ice

此处 Debug 填 2 表示只打印节点执行过程,pack 中的 iceId 即为此处的 ID,点击查看详情即可编排规则

5.3 编排 ice 规则

此编排实现逻辑:根据不同的输入单词,输出对应的结果到 roam 的 result 字段中供后续使用。

如输入 waitmoon,在管理员列表中,则输出"you are admin~"到 roam 的 result 字段


6 发布与执行

在编排完规则后切记要发布后才会将变更推送到客户端并生效!!!

在终端输入单词并回车

在 flink 项目日志里可以看到:

ice 打印了执行过程,[节点 ID:节点类名简称:节点执行结果:节点执行耗时]

flink 因为最后的 sink 是 print(),所以打印了对应的输出。


这时候你就可以随意的更改变更规则去实现自己的业务啦~~~


用户头像

waitmoon

关注

致力于解决灵活繁复的硬编码问题~ 2020.03.30 加入

来都来了,坐下喝茶~看看ice~~

评论

发布
暂无评论
Flink+ice 实现可视化规则配置(Demo)_flink_waitmoon_InfoQ写作社区