写点什么

Flink on Zeppelin (2) - Batch 篇

发布于: 2020 年 06 月 15 日
Flink on Zeppelin (2) - Batch篇

 在Flink on Zeppelin 入门篇 中我们讲述了如何配置 Zeppelin + Flink 来运行一个最简单的 WordCount 例子。本文将讲述如何使用 Flink SQL + UDF 来做 Batch ETL 和 BI 数据分析的任务。

Flink Interpreter 类型

    首先介绍下 Zeppelin 中的 Flink Interpreter 类型。Zeppelin 的 Flink Interpreter 支持 Flink 的所有 API (DataSet, DataStream, Table API )。语言方面支持 Scala,Python,SQL。下图是 Zeppelin 中支持的不同场景下的 Flink Interpreter。



配置 Flink Interpreter

       下图例举了所有重要的 Flink 配置信息,除此之外你还可以配置任意 Flink 的 Configuration(https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html


内置入口变量

      Flink Interpreter (%flink) 为用户自动创建了下面 6 个变量作为 Flink Scala 程序的入口。


  • senv (StreamExecutionEnvironment),

  • benv (ExecutionEnvironment)

  • stenv (StreamTableEnvironment for blink planner)

  • btenv (BatchTableEnvironment for blink planner)

  • stenv_2 (StreamTableEnvironment for flink planner)

  • btenv_2 (BatchTableEnvironment for flink planner)


      PyFlinkInterpreter (%flink.pyflink, %flink.ipyflink) 为用户自动创建了 6 个 python 变量作为 PyFlink 程序的入口

  • s_env (StreamExecutionEnvironment),

  • b_env (ExecutionEnvironment)

  • st_env (StreamTableEnvironment for blink planner)

  • bt_env (BatchTableEnvironment for blink planner)

  • st_env_2 (StreamTableEnvironment for flink planner)

  • bt_env_2 (BatchTableEnvironment for flink planner)


Blink/Flink Planner

Flink 1.10 中有 2 种 table api 的 planner:flink & blink.


  • 如果你用 DataSet api 以及需要把 DataSet 转换成 Table,那么就需要使用 Flink planner 的 TableEnvironment (btenv_2 and stenv_2).

  • 其他场景下, 我们都会建议用户使用blink planner. 这也是 Flink sql 使用的 planner(%flink.bsql & %flink.ssql)


使用 Flink Batch SQL

      %flink.bsql 是用来执行 Flink 的 batch sql. 运行 help 命令可以得到所有可用的命令



总的来说,Flink Batch SQL 可以用来做 2 大任务:

  • 使用 insert into 语句来做 Batch ETL

  • 使用 select 语句来做 BI 数据分析


基于 Bank 数据的 Batch ETL

下面我们基于 Bank (https://archive.ics.uci.edu/ml/datasets/bank+marketing)数据来做 Batch ETL 任务。

  • 首先用 Flink Sql 创建一个 raw 数据的 source table,以及清洗干净后的 sink table。




  • 然后再定义 Table Function 来 parse raw data。



  • 接下来就可以用 insert into 语句来进行数据转换(source table --> sink table)



  •  用 select 语句来 Preview 最终数据,验证 insert into 语句的正确性



基于 Bank 数据的 BI 数据分析

经过上面的数据清洗工作,接下来就可以对数据进行分析了。用户不仅可以使用标准的 SQL Select 语句进行分析,也可以使用 Zeppelin 的 dynamic forms 来增加交互性(TextBox,Select,Checkbox)



使用 Flink UDF

      SQL 虽然强大,但表达能力毕竟有限。有时候就要借助于 UDF 来表达更复杂的逻辑。Flink Interpreter 支持 2 种 UDF (Scala + Python)。下面是 2 个简单的例子。


       Scala UDF

%flink
class ScalaUpper extends ScalarFunction { def eval(str: String) = str.toUpperCase}
btenv.registerFunction("scala_upper", new ScalaUpper())
复制代码


    Python UDF


%flink.pyflink
class PythonUpper(ScalarFunction): def eval(self, s): return s.upper()
bt_env.register_function("python_upper", udf(PythonUpper(), DataTypes.STRING(), DataTypes.STRING()))
复制代码




对 Hive 数据的数据分析

     除了可以分析 Flink SQL 创建的 table 之外,Flink 也可以分析 Hive 上已有的 table。如果要让 Flink Interpreter 使用 Hive,那么需要做以下配置

  • 设置 zeppelin.flink.enableHive 为 true

  • Copy 下面这些 dependencies 到 flink 的 lib 目录

  • flink-connector-hive_{scala_version}-{flink.version}.jar

  • flink-hadoop-compatibility_{scala_version}-{flink.version}.jar

  • flink-shaded-hadoop-2-uber-{hadoop.version}-{flink-shaded.version}.jar

  • hive-exec-2.x.jar (for Hive 1.x, you need to copy hive-exec-1.x.jar, hive-metastore-1.x.jar, libfb303-0.9.2.jar and libthrift-0.9.2.jar)

  • 在 Flink interpreter setting 里或者 zeppelin-env.sh 里指定 HIVE_CONF_DIR

  • 在 Flink interpreter setting 指定 zeppelin.flink.hive.version 为你使用的 Hive 版本


下面就用一个简单的例子展示如何在 Zeppelin 中用 Flink 查询 Hive table


1. 用 Zeppelin 的 jdbc interpreter 查询 hive tables



2. 用 Flink sql 查询 hive table 的 schema



3. 用 Flink Sql 查询 hive table



       本文只是简单介绍如何在 Zeppelin 中使用 Flink SQL + UDF,关于更多 Flink SQL 和 UDF 请参考 Flink 官方文档

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html

  • https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html


如果有碰到任何问题,请加入下面这个钉钉群讨论。后续我们会有更多 Tutorial 的文章,敬请期待。



Apache Zeppelin 公众号


发布于: 2020 年 06 月 15 日阅读数: 467
用户头像

还未添加个人签名 2017.10.18 加入

Apache Member, PMC of Apache Tez, Livy, Zeppelin. Committer of Apache Pig

评论 (1 条评论)

发布
用户头像
感谢分享原创文章,InfoQ首页推荐。
2020 年 06 月 16 日 10:04
回复
没有更多了
Flink on Zeppelin (2) - Batch篇