写点什么

StreamX: Flink 开发脚手架, 流批一体大数据平台正式开源

用户头像
B e n
关注
发布于: 2021 年 04 月 12 日

StreamX

let flink|spark easy


一个神奇的框架,让 Flink 开发更简单

🚀 什么是 StreamX

    大数据技术如今发展的如火如荼,已经呈现百花齐放欣欣向荣的景象,实时处理流域 Apache SparkApache Flink 更是一个伟大的进步,尤其是Apache Flink被普遍认为是下一代大数据流计算引擎,我们在使用 Flink 时发现从编程模型, 启动配置到运维管理都有很多可以抽象共用的地方, 我们将一些好的经验固化下来并结合业内的最佳实践, 通过不断努力终于诞生了今天的框架 —— StreamX, 采用 java/scala 开发, 项目的初衷是 —— 让 Flink 开发更简单,使用StreamX开发,可以极大降低学习成本和开发门槛, 让开发者只用关心最核心的业务,StreamX 规范了项目的配置,鼓励函数式编程,提供了一系列开箱即用的Connectors,支持scalajava两套 api, 并且提供一个数据平台,基于Apache Flink 封装的一个可视化的,轻量级的 Flink Submit 系统,旨在简化 Flink 任务提交和管理运维,标准化了配置、开发、测试、部署、监控、运维的整个过程,其最终目的是打造一个一站式大数据平台,流批一体,湖仓一体的解决方案


官网地址 http://www.streamxhub.com

github 地址 https://github.com/streamxhub/streamx




🎉 Features

  • 开发脚手架

  • 一系列开箱即用的 connectors

  • 项目编译功能(maven 编译)

  • 支持Applicaion 模式, Yarn-Per-Job模式启动

  • 快捷的日常操作(任务启动停止savepoint,从savepoint恢复)

  • 支持火焰图

  • 支持notebook(在线任务开发)

  • 项目配置和依赖版本化管理

  • 支持任务备份、回滚(配置回滚)

  • 在线管理依赖(maven pom)和自定义 jar

  • 自定义 udf、连接器等支持

  • Flink sql Submit

  • Flink Sql WebIDE

  • 支持 catalog、hive

  • 从任务开发阶段到部署管理全链路支持

  • ...

组成部分

Streamx有三部分组成,分别是streamx-core,streamx-pumpstreamx-console

StreamX 架构图

streamx-core

streamx-core 定位是一个开发时框架,关注编码开发,规范了配置文件,按照约定优于配置的方式进行开发,提供了一个开发时 RunTime Content和一系列开箱即用的Connector,扩展了DataStream相关的方法,融合了DataStreamFlink sql api,简化繁琐的操作,聚焦业务本身,提高开发效率和开发体验

streamx-pump

pump 是抽水机,水泵的意思,streamx-pump的定位是一个数据抽取的组件,类似于flinkx,基于streamx-core中提供的各种connector开发,目的是打造一个方便快捷,开箱即用的大数据实时数据抽取和迁移组件,并且集成到streamx-console中,解决实时数据源获取问题,目前在规划中

streamx-console

streamx-console 是一个综合实时数据平台,低代码(Low Code)平台,可以较好的管理Flink任务,集成了项目编译、发布、参数配置、启动、savepoint,火焰图(flame graph),Flink SQL,监控等诸多功能于一体,大大简化了Flink任务的日常操作和维护,融合了诸多最佳实践。旧时王谢堂前燕,飞入寻常百姓家,让大公司有能力研发使用的项目,现在人人可以使用,其最终目标是打造成一个实时数仓,流批一体的一站式大数据解决方案

如何安装

streamx-console 提供了开箱即用的安装包,安装之前对环境有些要求,具体要求如下

环境

操作系统: Linux 必须: 是

JAVA: 1.8+ 必须:

Maven: 3+ 必须: 说明: 部署机器必须安装 Maven,且配置好环境变量,项目编译会用到

Hadoop: 2+ 必须: 说明: HDFS,YARN 等必须安装,并且配置好相关环境变量

Flink: 1.12.0+ 必须: 说明: Flink 版本必须是 1.12.1 或以上版本,并且配置好 Flink 相关环境变量

MySQL: 5.6+ 必须: 说明: 部署机器或其他机器得安装 MySQL,系统会用到 MySQL

Python: 2+ 必须: 说明: 非必须,火焰图功能会用到 Python

Perl: 5.16.3+ 必须: 说明: 非必须,火焰图功能会用到 Python

安装

在安装前一定要确保当前部署的机器满足上面环境相关的要求,当前安装的机器必须要有 Hadoop 环境,安装并配置好了Flink 1.12.0+,如果准备工作都已就绪,就可以安装了


git clone https://github.com/streamxhub/streamx.gitcd Streamxmvn clean install -DskipTests -Denv=dev
复制代码


编译成功后,在 streamx-console-service 模块下找到 streamx-console-service-1.0.0-bin-tar.gz,

解包后目录如下


.streamx-console-service-1.0.0├── bin│    ├── flame-graph│    ├──   └── *.py                             //火焰图相关功能脚本(内部使用,用户无需关注)│    ├── startup.sh                             //启动脚本  │    ├── setclasspath.sh                        //java环境变量相关的脚本(内部使用,用户无需关注)│    ├── shutdown.sh                            //停止脚本│    ├── yaml.sh                                //内部使用解析yaml参数的脚本(内部使用,用户无需关注)├── conf                           │    ├── application.yaml                       //项目的配置文件(注意不要改动名称)│    ├── application-prod.yml                   //项目的配置文件(开发者部署需要改动的文件,注意不要改动名称)│    ├── flink-application.template             //flink配置模板(内部使用,用户无需关注)│    ├── logback-spring.xml                     //logback│    └── streamx.sql                                 //工程初始化脚本├── lib│    └── *.jar                                  //项目的jar包├── plugins   │    ├── streamx-jvm-profiler-1.0.0.jar         //jvm-profiler,火焰图相关功能(内部使用,用户无需关注)│    └── streamx-flink-sqlcli-1.0.0.jar         //Flink SQl提交相关功能(内部使用,用户无需关注)├── logs                                        //程序log目录└── temp                                        //内部使用到的零时路径,不要删除

复制代码

1.初始化工程 SQL

streamx-console 要求的数据库是 MySQL,版本 5.6+以上,如准备就绪则进行下面的操作:

  • 创建数据库:streamx

  • 执行初始化 sql (解包后的 conf/streamx.sql)

2.修改相关的数据库信息

工程 SQL 初始化完毕,则修改conf/application-prod.yml,找到 datasource 这一项,找到 mysql 的配置,修改成对应的信息即可,如下

  datasource:    dynamic:      # 是否开启 SQL日志输出,生产环境建议关闭,有性能损耗      p6spy: true      hikari:        connection-timeout: 30000        max-lifetime: 1800000        max-pool-size: 15        min-idle: 5        connection-test-query: select 1        pool-name: HikariCP-DS-POOL      # 配置默认数据源      primary: primary      datasource:        # 数据源-1,名称为 primary        primary:          username: $user          password: $password          driver-class-name: com.mysql.cj.jdbc.Driver          url: jdbc:mysql://$host:$port/streamx?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
复制代码

3.启动 streamx-console

进入到bin下直接执行 start.sh 即可启动项目,默认端口是 10000,如果没啥意外则会启动成功


cd streamx-console-service-1.0.0/binbash start.sh
复制代码


相关的日志会输出到 streamx-console-service-1.0.0/logs/streamx.out


打开浏览器 输入 http://$deploy_host:10000/index.html 即可登录,登录界面如下



默认密码: admin / streamx

4. 系统配置

进入系统之后,第一件要做的事情就是修改系统配置,在菜单/StreamX/Setting 下,修改StreamX Webapp addressStreamX Console Workspace 如下:



  • StreamX Webapp address 配置StreamX Console后台服务的访问地址

  • StreamX Console Workspace 配置系统的工作空间,用于存放项目源码,编译后的项目等

如何使用

streamx-console 定位是流批一体的大数据平台,一站式解决方案,使用起来非常简单,没有复杂的概念和繁琐的操作,标准的 Flink 程序(安装 Flink 官方要去的结构和规范)和用streamx开发的项目都做了很好的支持,下面我们使用streamx-quickstart来快速开启 streamx-console 之旅

streamx-quickstart是 StreamX 开发 Flink 的上手示例程序,具体请查阅


部署 DataStream 任务

下面的示例演示了如何部署一个 DataStream 应用

http://assets.streamxhub.com/20210408008.mp4


部署 FlinkSql 任务

下面的示例演示了如何部署一个 FlinkSql 应用

http://assets.streamxhub.com/flinksql.mp4


  • 项目演示使用到的 flink sql 如下

CREATE TABLE user_log (    user_id VARCHAR,    item_id VARCHAR,    category_id VARCHAR,    behavior VARCHAR,    ts TIMESTAMP(3)) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user_behavior',  -- kafka topic'connector.properties.bootstrap.servers'='kafka-1:9092,kafka-2:9092,kafka-3:9092','connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取'update-mode' = 'append','format.type' = 'json',  -- 数据源格式为 json'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则);
CREATE TABLE pvuv_sink ( dt VARCHAR, pv BIGINT, uv BIGINT) WITH ('connector.type' = 'jdbc', -- 使用 jdbc connector'connector.url' = 'jdbc:mysql://test-mysql:3306/test', -- jdbc url'connector.table' = 'pvuv_sink', -- 表名'connector.username' = 'root', -- 用户名'connector.password' = '123456', -- 密码'connector.write.flush.max-rows' = '1' -- 默认5000条,为了演示改为1条);
INSERT INTO pvuv_sinkSELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uvFROM user_logGROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
复制代码


  • 使用到 maven 依赖如下


<dependency>    <groupId>mysql</groupId>    <artifactId>mysql-connector-java</artifactId>    <version>5.1.48</version></dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka_2.12</artifactId> <version>1.12.0</version></dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.11</artifactId> <version>1.12.0</version></dependency>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.12.0</version></dependency>
复制代码


  • Kafka 模拟发送的数据如下


{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts":"2021-02-01T01:00:00Z"}{"user_id": "662867", "item_id":"2244074","category_id":"1575622","behavior": "pv", "ts":"2021-02-01T01:00:00Z"}{"user_id": "662867", "item_id":"2244074","category_id":"1575622","behavior": "pv", "ts":"2021-02-01T01:00:00Z"}{"user_id": "662867", "item_id":"2244074","category_id":"1575622","behavior": "learning flink", "ts":"2021-02-01T01:00:00Z"}
复制代码

任务启动流程

任务启动流程图如下


关于项目的概念,Development Mode,savepoint,NoteBook,自定义 jar 管理,任务发布,任务恢复,参数配置,参数对比,多版本管理等等更多使用教程和文档请移步官网http://www.streamxhub.com


目前项目已正式开源,无数个日夜里,作者在源码中苦苦寻找答案,并以此为乐,历经无数汗水,现终于得见天日. 现在她如初生婴儿一般满怀无限憧憬的出现在世人面前,请多多关照,如果眼下还是一团零星之火,运筹帷幄之后,迎面东风,就是一场烈焰燎原吧

发布于: 2021 年 04 月 12 日阅读数: 906
用户头像

B e n

关注

还未添加个人签名 2019.02.11 加入

还未添加个人简介

评论 (2 条评论)

发布
用户头像
KubernetesSubmit支持application模式发布吗?支持ingress对外暴露rest服务么?
2021 年 04 月 12 日 23:16
回复
你好,目前只支持application和yarnprejob提交模式, Kubernetes暂时还不支持,后续会支持
2021 年 04 月 12 日 23:34
回复
没有更多了
StreamX: Flink开发脚手架,流批一体大数据平台正式开源