写点什么

Flink 集成 hive 测试及生产规划

发布于: 1 小时前
Flink集成hive测试及生产规划

作者:张剑

引言

在实时处理领域,先后发展了 Storm,Spark streaming,Flink 三个实时处理引擎,作为最先被广泛采用的 Storm,现在已经逐渐退出了实时处理领域。现在企业实时处理和实时数仓,绝大部分都采用 Flink 作为实时处理框架。可以说,Flink 已经成为实时计算领域事实上的标准。

Flink 从 2015 年逐渐发展起来,成为 apache 顶级项目,之后母公司又被阿里收购,在原有的基础上融合了阿里的内部的 blink 版本,现在已经发展到 1.13 版本。Flink 也在从原有的流处理上,逐步扩张到批流融合上。在 1.10 开始,Flink 对 catalog 模块进行了重构,能够兼容 hive 的 catalog,并与之打通。作为离线计算的经久不衰的 Hive 组件,使用扮演着开源离线领域数仓的基础,很多公司的离线数仓都是以 Hdfs,Yarn,Hive 为基础建设起来的。

那么,我们就以 2020 年下半年 release 的 Flink1.12 版本来,测试一下与 Hive 的使用,并探讨如何在生产上更好的应用。

第 1 节 环境准备

第 1.1 节 实时处理 Flink1.12 环境

第一步 本地环境

Mac 系统:预装了 JDK1.8 的环境(如果没有可以参考这篇文章安装,https://www.jianshu.com/p/befc951295f1,如果是 Linux 或者 Windows 环境,都是类似的,网上有很多已经写的教程,可以百度或者谷歌一下。)

第二步 安装并配置 Flink1.12

此次测试采用 Flink1.12.3 版本,理论上 Flink1.12.0/1.12.1/1.12.2/1.12.3 小版本不会有差异。

下载可以直接从官网下载:

Flink 下载页:https://flink.apache.org/downloads.html,找到 1.12.2 版本,下载 Binaries,依赖的 scala 版本,2.11 和 2.12 都可以。

https://archive.apache.org/dist/flink/flink-1.12.3/flink-1.12.3-bin-scala_2.12.tgz

下载结果:

21:52 flink-1.12.3-bin-scala_2.12.tgz
复制代码

解压:

tar -zxvf flink-1.12.3-bin-scala_2.12.tgz
复制代码

解压后的目录

cd flink-1.12.2
复制代码

查看当前目录:

-rw-r--r--   1 jian.zhang  623285134    11K  1 15 11:36 LICENSE-rw-r--r--   1 jian.zhang  623285134   550K  2 27 05:29 NOTICE-rw-r--r--   1 jian.zhang  623285134   1.3K  1 15 11:36 README.txtdrwxr-xr-x@ 26 jian.zhang  623285134   832B  2 27 05:29 bindrwxr-xr-x  14 jian.zhang  623285134   448B  6 20 11:48 confdrwxr-xr-x   7 jian.zhang  623285134   224B  2 27 05:29 examplesdrwxr-xr-x@ 15 jian.zhang  623285134   480B  6 20 11:19 libdrwxr-xr-x  32 jian.zhang  623285134   1.0K  2 27 05:29 licensesdrwxr-xr-x@ 10 jian.zhang  623285134   320B  6 20 11:49 logdrwxr-xr-x@ 19 jian.zhang  623285134   608B  2 27 05:29 optdrwxr-xr-x@ 11 jian.zhang  623285134   352B  2 27 05:29 plugins
复制代码

Flink 是开箱即用,如果没有任何配置,也可以直接在本地运行 Standalone 环境

不过为了要兼容 Hive,需要做一些额外配置。演示情况下,会通过 Flink Sql Client 进行提交任务。

增加 standalone 资源,cd conf 目录,修改 flink-conf.yaml, 设置 TaskSlots 和默认的 checkpoint 间隔和状态后端。



配置 Flink 即将要使用的 Hive 的数据库,因为使用 sql client 方式,那么需要修改 sql-client-defaults.yaml



图上配置使用 hive 的 default 数据库,hive-site 在/Users/jian.zhang04/Documents/apache-hive-2.3.8-bin/conf,这个路径是一会要安装 hive 的路径。


将读写 Hive 的依赖包准备好,放入 flink-1.12.2/lib

依赖包有两个:

下载 flink-sql-connector-hive-2.3.6

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.11/1.12.3/flink-sql-connector-hive-2.3.6_2.11-1.12.3.jar
复制代码

拷贝下载的 hive 安装包里面的 hive-exec-2.3.8.jar

cp ../apache-hive-2.3.8-bin/lib/hive-exec-2.3.8.jar ../flink-1.12.2/lib/
复制代码

到此,Flink 的测试准备环境就完成了,等离线环境准备好之后,就可以启动 standalone 集群和 Sql-client 来管理 catalog 和提交任务了。

第 1.2 节 离线数仓 hive2.3.8 环境

Hadoop 环境:hadoop-2.9.2(安装可以参考https://blog.csdn.net/sone_thor/article/details/100162896)

Hive 2.3.8

下载并解压

wget https://downloads.apache.org/hive/hive-2.3.8/apache-hive-2.3.8-bin.tar.gz
tar -zxvf apache-hive-2.3.8-bin.tar.gz
ls /Users/jian.zhang04/Documents/apache-hive-2.3.8-bin
-rw-r--r-- 1 jian.zhang04 623285134 20K 1 7 03:37 LICENSE-rw-r--r-- 1 jian.zhang04 623285134 230B 1 7 03:37 NOTICE-rw-r--r-- 1 jian.zhang04 623285134 702B 1 8 03:36 RELEASE_NOTES.txtdrwxr-xr-x 11 jian.zhang04 623285134 352B 6 20 11:24 bindrwxr-xr-x 21 jian.zhang04 623285134 672B 2 19 17:30 binary-package-licensesdrwxr-xr-x 14 jian.zhang04 623285134 448B 6 20 10:22 confdrwxr-xr-x 4 jian.zhang04 623285134 128B 2 19 17:30 examplesdrwxr-xr-x 7 jian.zhang04 623285134 224B 2 19 17:30 hcatalogdrwxr-xr-x 13 jian.zhang04 623285134 416B 6 20 11:29 iotmpdrwxr-xr-x 3 jian.zhang04 623285134 96B 2 19 17:30 jdbcdrwxr-xr-x 290 jian.zhang04 623285134 9.1K 6 20 10:18 libdrwxr-xr-x 4 jian.zhang04 623285134 128B 2 19 17:30 scripts
复制代码

安装 mysql

sudo apt-get install -y mysql-server-5.7输入 root 密码完成安装
下载 mysql jdbc driver
cd /opt/bigdata/hive-2.3.8/lib
wget https://maven.aliyun.com/repository/public/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar
开启 mysql 远程连接
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '密码' WITH GRANT OPTION;
flush privileges;
再进入 /etc/mysql/mysql.conf.d/mysqld.cnf 文件将 bind-address 设置成 0.0.0.0
重启 mysql 服务
sudo systemctl restart mysql
复制代码


配置环境变量

在 /etc/profile 添加如下配置:
export HIVE_HOME=/Users/jian.zhang04/Documents/apache-hive-2.3.8-bin
PATH=$PATH:$HIVE_HOME/bin
source /etc/profile 使配置生效
复制代码

配置 hive

mv conf/hive-env.sh.template conf/hive-env.sh # 重命名环境文件
mv conf/hive-log4j2.properties.template conf/hive-log4j2.properties # 重命名日志文件
cp conf/hive-default.xml.template conf/hive-site.xml # 拷贝生成 xml 文件
复制代码

然后修改 hive-site.xml 文件如下配置

<!-- hive元数据地址,默认是/user/hive/warehouse -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<!-- hive查询时输出列名 -->
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<!-- 显示当前数据库名 -->
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
<!-- 开启本地模式,默认是false -->
<property>
<name>hive.exec.mode.local.auto</name>
<value>true</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>Username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>root</value>
<description>password to use against metastore database</description>
</property>
<!-- URL用于连接远程元数据 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://master1:9083</value>
</property>
<!-- 元数据使用mysql数据库 -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://master1:3306/hivedb?createDatabaseIfNotExist=true&amp;userSSL=false</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.cj.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
复制代码


验证 Hive 安装

运行 Hive 之前,需要创建 /tmp 文件夹在 HDFS 独立的 Hive 文件夹。在这里使用 /user/hive/warehouse 文件夹。

hadoop fs -mkdir /tmp
hadoop fs -mkdir -p /user/hive/warehouse
hadoop fs -chmod g+w /tmp
hadoop fs -chmod g+w /user/hive/warehouse
复制代码

初始化 Hive

schematool -dbType mysql -initSchema
出现 schemeTool completed 表示初始化成功
Metastore connection URL: jdbc:mysql://master1:3306/hivedb?createDatabaseIfNotExist=true&userSSL=false
Metastore Connection Driver : com.mysql.cj.jdbc.Driver
Metastore connection User: root
Starting metastore schema initialization to 2.3.0
Initialization script hive-schema-2.3.0.mysql.sql
Initialization script completed
schemaTool completed
复制代码

开启元数据服务

nohup hive --service metastore &
复制代码

进入 Hive Shell

Logging initialized using configuration in file:/opt/bigdata/hive-2.3.8/conf/hive-log4j2.properties Async: true
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
hive> show databases;
OK
default
Time taken: 8.601 seconds, Fetched: 1 row(s)
hive>
Hiveserver2
复制代码

开启 hiveserver2,修改 hive-site.xml 配置文件,增加如下配置:

<!-- 这是hiveserver2 -->
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>master1</value>
</property>
复制代码

配置 hadoop 中的 core-site.xml 文件,并同步带其他主机

<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.bennie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.bennie.groups</name>
<value>*</value>
</property>
复制代码

启动 hiveserver2

hiveserver2
#或者
hive --service hiveserver2
# 后台运行
nohup hiveserver2 &
复制代码



至此,Hive 的准备环境已经 OK。


第 2 节 测试 Flink 读写 Hive 表

第 2.1 节 测试 Flink 基本读写

启动 standalone 集群

进入到flink-1.12.2目录,执行启动命令bin/start-cluster.sh浏览器打开:http://localhost:8081/#/overview,已经有10个slots可用
复制代码



启动 Sql-client

进入到flink-1.12.2目录bin/sql-client.sh embedded
复制代码




执行查看hive的表命令create database t1;create database t2;show databases;use t2;show tables;
复制代码


测试过程中发现一个有问题的地方

切换数据库之后,再次切换 default,会抛异常(这个之后再研究)。


第 2.2 节 Flink 写入已经存在的 Hive 表

创建一张 hive 内部表

set table.sql-dialect=hive;CREATE TABLE IF NOT EXISTS sink1 (f1 int ,f2 int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
复制代码


创建一张数据源表

set table.sql-dialect=default;//一秒钟生成一条测试数据CREATE TABLE source1 (    f1 int,    f2 int) WITH (  'connector' = 'datagen',  'rows-per-second' ='1');
复制代码


实际在 hive 表,我们用 hive client 查看一下,flink 创建的表,在 hive 里面是一张对应的表

提交 flink 写入任务

insert into sink1 select * from source1;
复制代码





可以看到 Flink 任务已经正常运行,CK 已经正常,我们可以查看一下数据。数据已经正常写入。


第 2.3 节 Flink 创建 Hive 分区表并写入

执行语句:

set table.sql-dialect=hive;CREATE TABLE hive_table (  user_id STRING,  order_amount DOUBLE) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',  'sink.partition-commit.trigger'='partition-time',  'sink.partition-commit.delay'='5 minutes',  'sink.partition-commit.policy.kind'='metastore,success-file');
set table.sql-dialect=default;CREATE TABLE datagen_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND) WITH ('connector' = 'datagen', 'rows-per-second' ='1');
INSERT INTO hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')FROM datagen_table;
复制代码




等待做 checkpoint,然后查看写入的数据,用 flink-sql 查看

select count(1) from hive_table;
set table.sql-dialect=hive;
复制代码



至此,Flink 读写均已测试。

第 3 节 Flink 读写 Hive 表原理

第 3.1 节 识别 Hive 表

Flink 读取 Hive 表跟其他的外部数据源不太一样,其他的组件是通过 with 属性中的 type+SPI 服务扫描对应的 Factory 类实现,而 hive 则是直接基于 HiveCatalog 创建。

创建 TableFactory 之后的套路,就都一样了。比如创建对应的 HiveTableSource,在调用 TableSource 的 getDataStream 方法时,创建对应的 HiveTableInpurtFormat,最后基于这个 inputFormat 执行查询操作。


第 3.2 节 如何读取 Hive 中的数据

先来看看第一个问题,一个标准的 Hive 表创建后,可以在 hive 命令行中查看对应的创建信息,使用的命令为:hive> describe formatted table_name;



看到这里应该就能猜想到 Flink 是如何读取数据的了吧:1 获取 HiveCatalog 中的定义,拿到对应的目录地址、SerDe、InputFormat

2 通过反射创建对应的反序列化器和 InputFormat,获得一条结果

3 调用 recordReader.next()获取下一条数据

第 3.3 节 Hive 表的并行度控制

在 HiveTableSource 调用 getDataStream 的第一步会初始化分区信息,返回当前 sql 匹配的分区。然后创建对应的 InputFormat,当开启了并行度自动推断后,基于前面的分区信息,统计文件个数,并把对应的文件内容放入 InputSplit[]中。这个数组有两个作用:

后面判断并行度时,根据该数组的长度与默认配置进行比较,确定最终并行度

切分形成 Task 后,task 会基于这个数组确定每个 Slot 读取的文件


第 4 节 如何在生产上使用设想

弄明白了 Flink 读写 Hive 的行为和表现之后,那么如何在生产上使用呢。

第 4.1 节 实时血缘

第一个问题:测试完之后,发现如何用了 hive 的 catalog,那么创建的 kafka 数据源等,都会作为元数据存储到 schema 里面。这个信息如果存入 hive 里面,之后离线数仓的血缘,是不是一套就解决了呢?

第 4.2 节 实时离线 hive 表混用问题

第二个问题:但是里面还存在另一个问题,如果直接用了离线的 Hive,创建这么多实时的表,会不会造成和离线的表管理混乱?

第 4.3 节 多 flink 任务在 hive 中表重复

第三个问题:如果所有的实时任务用同一个 hive 的库,每个实时任务之前都是独立的,那么每个人写的任务创建的表,就有可能重合和错乱。如果用不同的 hive 的库,那么 hive 的库的多少和任务数可能就是线性关系了,这个是否合理呢?

第 4.4 节 应对思路

首先,第一个问题,血缘确实可以依靠 hive 做,如果公司本身离线数仓已经做过血缘依赖的项目,就可以直接拿来复用。

其次,第二个问题,离线和实时共用一个 hive 的问题,需要解决 Flink 使用离线 Hive 的权限问题,需要有权限管理,不然实时数据表会对离线造成影响。

第三,第三个问题,如果是 SQL 任务,那么需要在任务提交之前,将所有的 create 表名抽离出来,并对表名进行重命名管理。比如一般公司 flink 任务名是不能重复,就可以用 flink 任务名+表名作为新的名称。其次,需要在任务停止或者删除的时候,额外的去删除任务在 hive 中创建表,或者做单独处理。

以上,就是我的测试和一些想法,如果有不对的地方,还望海涵。



发布于: 1 小时前阅读数: 15
用户头像

持续的思考,不断的实践 2020.07.26 加入

读书犹如吃饭,吃了什么不会记得,但是食物已成为身体的一部分。读书读了什么,很多都已经忘了,但是他却成为了精神的一部分。

评论

发布
暂无评论
Flink集成hive测试及生产规划