作者:张剑
引言
在实时处理领域,先后发展了 Storm,Spark streaming,Flink 三个实时处理引擎,作为最先被广泛采用的 Storm,现在已经逐渐退出了实时处理领域。现在企业实时处理和实时数仓,绝大部分都采用 Flink 作为实时处理框架。可以说,Flink 已经成为实时计算领域事实上的标准。
Flink 从 2015 年逐渐发展起来,成为 apache 顶级项目,之后母公司又被阿里收购,在原有的基础上融合了阿里的内部的 blink 版本,现在已经发展到 1.13 版本。Flink 也在从原有的流处理上,逐步扩张到批流融合上。在 1.10 开始,Flink 对 catalog 模块进行了重构,能够兼容 hive 的 catalog,并与之打通。作为离线计算的经久不衰的 Hive 组件,使用扮演着开源离线领域数仓的基础,很多公司的离线数仓都是以 Hdfs,Yarn,Hive 为基础建设起来的。
那么,我们就以 2020 年下半年 release 的 Flink1.12 版本来,测试一下与 Hive 的使用,并探讨如何在生产上更好的应用。
第 1 节 环境准备
第 1.1 节 实时处理 Flink1.12 环境
第一步 本地环境
Mac 系统:预装了 JDK1.8 的环境(如果没有可以参考这篇文章安装,https://www.jianshu.com/p/befc951295f1,如果是 Linux 或者 Windows 环境,都是类似的,网上有很多已经写的教程,可以百度或者谷歌一下。)
第二步 安装并配置 Flink1.12
此次测试采用 Flink1.12.3 版本,理论上 Flink1.12.0/1.12.1/1.12.2/1.12.3 小版本不会有差异。
下载可以直接从官网下载:
Flink 下载页:https://flink.apache.org/downloads.html,找到 1.12.2 版本,下载 Binaries,依赖的 scala 版本,2.11 和 2.12 都可以。
https://archive.apache.org/dist/flink/flink-1.12.3/flink-1.12.3-bin-scala_2.12.tgz
下载结果:
21:52 flink-1.12.3-bin-scala_2.12.tgz
复制代码
解压:
tar -zxvf flink-1.12.3-bin-scala_2.12.tgz
复制代码
解压后的目录
查看当前目录:
-rw-r--r-- 1 jian.zhang 623285134 11K 1 15 11:36 LICENSE
-rw-r--r-- 1 jian.zhang 623285134 550K 2 27 05:29 NOTICE
-rw-r--r-- 1 jian.zhang 623285134 1.3K 1 15 11:36 README.txt
drwxr-xr-x@ 26 jian.zhang 623285134 832B 2 27 05:29 bin
drwxr-xr-x 14 jian.zhang 623285134 448B 6 20 11:48 conf
drwxr-xr-x 7 jian.zhang 623285134 224B 2 27 05:29 examples
drwxr-xr-x@ 15 jian.zhang 623285134 480B 6 20 11:19 lib
drwxr-xr-x 32 jian.zhang 623285134 1.0K 2 27 05:29 licenses
drwxr-xr-x@ 10 jian.zhang 623285134 320B 6 20 11:49 log
drwxr-xr-x@ 19 jian.zhang 623285134 608B 2 27 05:29 opt
drwxr-xr-x@ 11 jian.zhang 623285134 352B 2 27 05:29 plugins
复制代码
Flink 是开箱即用,如果没有任何配置,也可以直接在本地运行 Standalone 环境
不过为了要兼容 Hive,需要做一些额外配置。演示情况下,会通过 Flink Sql Client 进行提交任务。
增加 standalone 资源,cd conf 目录,修改 flink-conf.yaml, 设置 TaskSlots 和默认的 checkpoint 间隔和状态后端。
配置 Flink 即将要使用的 Hive 的数据库,因为使用 sql client 方式,那么需要修改 sql-client-defaults.yaml
图上配置使用 hive 的 default 数据库,hive-site 在/Users/jian.zhang04/Documents/apache-hive-2.3.8-bin/conf,这个路径是一会要安装 hive 的路径。
将读写 Hive 的依赖包准备好,放入 flink-1.12.2/lib
依赖包有两个:
下载 flink-sql-connector-hive-2.3.6
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.11/1.12.3/flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar
复制代码
拷贝下载的 hive 安装包里面的 hive-exec-2.3.8.jar
cp ../apache-hive-2.3.8-bin/lib/hive-exec-2.3.8.jar ../flink-1.12.2/lib/
复制代码
到此,Flink 的测试准备环境就完成了,等离线环境准备好之后,就可以启动 standalone 集群和 Sql-client 来管理 catalog 和提交任务了。
第 1.2 节 离线数仓 hive2.3.8 环境
Hadoop 环境:hadoop-2.9.2(安装可以参考https://blog.csdn.net/sone_thor/article/details/100162896)
Hive 2.3.8
下载并解压
wget https://downloads.apache.org/hive/hive-2.3.8/apache-hive-2.3.8-bin.tar.gz
tar -zxvf apache-hive-2.3.8-bin.tar.gz
ls /Users/jian.zhang04/Documents/apache-hive-2.3.8-bin
-rw-r--r-- 1 jian.zhang04 623285134 20K 1 7 03:37 LICENSE
-rw-r--r-- 1 jian.zhang04 623285134 230B 1 7 03:37 NOTICE
-rw-r--r-- 1 jian.zhang04 623285134 702B 1 8 03:36 RELEASE_NOTES.txt
drwxr-xr-x 11 jian.zhang04 623285134 352B 6 20 11:24 bin
drwxr-xr-x 21 jian.zhang04 623285134 672B 2 19 17:30 binary-package-licenses
drwxr-xr-x 14 jian.zhang04 623285134 448B 6 20 10:22 conf
drwxr-xr-x 4 jian.zhang04 623285134 128B 2 19 17:30 examples
drwxr-xr-x 7 jian.zhang04 623285134 224B 2 19 17:30 hcatalog
drwxr-xr-x 13 jian.zhang04 623285134 416B 6 20 11:29 iotmp
drwxr-xr-x 3 jian.zhang04 623285134 96B 2 19 17:30 jdbc
drwxr-xr-x 290 jian.zhang04 623285134 9.1K 6 20 10:18 lib
drwxr-xr-x 4 jian.zhang04 623285134 128B 2 19 17:30 scripts
复制代码
安装 mysql
sudo apt-get install -y mysql-server-5.7
输入 root 密码完成安装
下载 mysql jdbc driver
cd /opt/bigdata/hive-2.3.8/lib
wget https://maven.aliyun.com/repository/public/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar
开启 mysql 远程连接
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '密码' WITH GRANT OPTION;
flush privileges;
再进入 /etc/mysql/mysql.conf.d/mysqld.cnf 文件将 bind-address 设置成 0.0.0.0
重启 mysql 服务
sudo systemctl restart mysql
复制代码
配置环境变量
在 /etc/profile 添加如下配置:
export HIVE_HOME=/Users/jian.zhang04/Documents/apache-hive-2.3.8-bin
PATH=$PATH:$HIVE_HOME/bin
source /etc/profile 使配置生效
复制代码
配置 hive
mv conf/hive-env.sh.template conf/hive-env.sh # 重命名环境文件
mv conf/hive-log4j2.properties.template conf/hive-log4j2.properties # 重命名日志文件
cp conf/hive-default.xml.template conf/hive-site.xml # 拷贝生成 xml 文件
复制代码
然后修改 hive-site.xml 文件如下配置
<!-- hive元数据地址,默认是/user/hive/warehouse -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<!-- hive查询时输出列名 -->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<!-- 显示当前数据库名 -->
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<!-- 开启本地模式,默认是false -->
<property>
<name>hive.exec.mode.local.auto</name>
<value>true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>Username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
<description>password to use against metastore database</description>
</property>
<!-- URL用于连接远程元数据 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://master1:9083</value>
</property>
<!-- 元数据使用mysql数据库 -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://master1:3306/hivedb?createDatabaseIfNotExist=true&userSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
复制代码
验证 Hive 安装
运行 Hive 之前,需要创建 /tmp 文件夹在 HDFS 独立的 Hive 文件夹。在这里使用 /user/hive/warehouse 文件夹。
hadoop fs -mkdir /tmp
hadoop fs -mkdir -p /user/hive/warehouse
hadoop fs -chmod g+w /tmp
hadoop fs -chmod g+w /user/hive/warehouse
复制代码
初始化 Hive
schematool -dbType mysql -initSchema
出现 schemeTool completed 表示初始化成功
Metastore connection URL: jdbc:mysql://master1:3306/hivedb?createDatabaseIfNotExist=true&userSSL=false
Metastore Connection Driver : com.mysql.cj.jdbc.Driver
Metastore connection User: root
Starting metastore schema initialization to 2.3.0
Initialization script hive-schema-2.3.0.mysql.sql
Initialization script completed
schemaTool completed
复制代码
开启元数据服务
nohup hive --service metastore &
复制代码
进入 Hive Shell
Logging initialized using configuration in file:/opt/bigdata/hive-2.3.8/conf/hive-log4j2.properties Async: true
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
hive> show databases;
OK
default
Time taken: 8.601 seconds, Fetched: 1 row(s)
hive>
Hiveserver2
复制代码
开启 hiveserver2,修改 hive-site.xml 配置文件,增加如下配置:
<!-- 这是hiveserver2 -->
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>master1</value>
</property>
复制代码
配置 hadoop 中的 core-site.xml 文件,并同步带其他主机
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.bennie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.bennie.groups</name>
<value>*</value>
</property>
复制代码
启动 hiveserver2
hiveserver2
#或者
hive --service hiveserver2
# 后台运行
nohup hiveserver2 &
复制代码
至此,Hive 的准备环境已经 OK。
第 2 节 测试 Flink 读写 Hive 表
第 2.1 节 测试 Flink 基本读写
启动 standalone 集群
进入到flink-1.12.2目录,执行启动命令
bin/start-cluster.sh
浏览器打开:http://localhost:8081/#/overview,已经有10个slots可用
复制代码
启动 Sql-client
进入到flink-1.12.2目录
bin/sql-client.sh embedded
复制代码
执行查看hive的表命令
create database t1;
create database t2;
show databases;
use t2;
show tables;
复制代码
测试过程中发现一个有问题的地方
切换数据库之后,再次切换 default,会抛异常(这个之后再研究)。
第 2.2 节 Flink 写入已经存在的 Hive 表
创建一张 hive 内部表
set table.sql-dialect=hive;
CREATE TABLE IF NOT EXISTS sink1 (
f1 int ,
f2 int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
;
复制代码
创建一张数据源表
set table.sql-dialect=default;
//一秒钟生成一条测试数据
CREATE TABLE source1 (
f1 int,
f2 int
) WITH (
'connector' = 'datagen',
'rows-per-second' ='1'
);
复制代码
实际在 hive 表,我们用 hive client 查看一下,flink 创建的表,在 hive 里面是一张对应的表
提交 flink 写入任务
insert into sink1 select * from source1;
复制代码
可以看到 Flink 任务已经正常运行,CK 已经正常,我们可以查看一下数据。数据已经正常写入。
第 2.3 节 Flink 创建 Hive 分区表并写入
执行语句:
set table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='5 minutes',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
set table.sql-dialect=default;
CREATE TABLE datagen_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen',
'rows-per-second' ='1');
INSERT INTO hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
FROM datagen_table;
复制代码
等待做 checkpoint,然后查看写入的数据,用 flink-sql 查看
select count(1) from hive_table;
set table.sql-dialect=hive;
复制代码
至此,Flink 读写均已测试。
第 3 节 Flink 读写 Hive 表原理
第 3.1 节 识别 Hive 表
Flink 读取 Hive 表跟其他的外部数据源不太一样,其他的组件是通过 with 属性中的 type+SPI 服务扫描对应的 Factory 类实现,而 hive 则是直接基于 HiveCatalog 创建。
创建 TableFactory 之后的套路,就都一样了。比如创建对应的 HiveTableSource,在调用 TableSource 的 getDataStream 方法时,创建对应的 HiveTableInpurtFormat,最后基于这个 inputFormat 执行查询操作。
第 3.2 节 如何读取 Hive 中的数据
先来看看第一个问题,一个标准的 Hive 表创建后,可以在 hive 命令行中查看对应的创建信息,使用的命令为:hive> describe formatted table_name;
看到这里应该就能猜想到 Flink 是如何读取数据的了吧:1 获取 HiveCatalog 中的定义,拿到对应的目录地址、SerDe、InputFormat
2 通过反射创建对应的反序列化器和 InputFormat,获得一条结果
3 调用 recordReader.next()获取下一条数据
第 3.3 节 Hive 表的并行度控制
在 HiveTableSource 调用 getDataStream 的第一步会初始化分区信息,返回当前 sql 匹配的分区。然后创建对应的 InputFormat,当开启了并行度自动推断后,基于前面的分区信息,统计文件个数,并把对应的文件内容放入 InputSplit[]中。这个数组有两个作用:
后面判断并行度时,根据该数组的长度与默认配置进行比较,确定最终并行度
切分形成 Task 后,task 会基于这个数组确定每个 Slot 读取的文件
第 4 节 如何在生产上使用设想
弄明白了 Flink 读写 Hive 的行为和表现之后,那么如何在生产上使用呢。
第 4.1 节 实时血缘
第一个问题:测试完之后,发现如何用了 hive 的 catalog,那么创建的 kafka 数据源等,都会作为元数据存储到 schema 里面。这个信息如果存入 hive 里面,之后离线数仓的血缘,是不是一套就解决了呢?
第 4.2 节 实时离线 hive 表混用问题
第二个问题:但是里面还存在另一个问题,如果直接用了离线的 Hive,创建这么多实时的表,会不会造成和离线的表管理混乱?
第 4.3 节 多 flink 任务在 hive 中表重复
第三个问题:如果所有的实时任务用同一个 hive 的库,每个实时任务之前都是独立的,那么每个人写的任务创建的表,就有可能重合和错乱。如果用不同的 hive 的库,那么 hive 的库的多少和任务数可能就是线性关系了,这个是否合理呢?
第 4.4 节 应对思路
首先,第一个问题,血缘确实可以依靠 hive 做,如果公司本身离线数仓已经做过血缘依赖的项目,就可以直接拿来复用。
其次,第二个问题,离线和实时共用一个 hive 的问题,需要解决 Flink 使用离线 Hive 的权限问题,需要有权限管理,不然实时数据表会对离线造成影响。
第三,第三个问题,如果是 SQL 任务,那么需要在任务提交之前,将所有的 create 表名抽离出来,并对表名进行重命名管理。比如一般公司 flink 任务名是不能重复,就可以用 flink 任务名+表名作为新的名称。其次,需要在任务停止或者删除的时候,额外的去删除任务在 hive 中创建表,或者做单独处理。
以上,就是我的测试和一些想法,如果有不对的地方,还望海涵。
评论