Flink on Zeppelin (5) 高级特性篇
在Flink on Zeppelin系列的前面几篇文章中,我讲述了如何在Zeppelin里使用Flink的一些基本操作和配置, 中间也会穿插一些高级feature,但都比较零散,这篇文章会集中重点讲述一些非常实用的Flink on Zeppelin的高级feature。
Dependency management
在Zeppelin,你可以使用Scala,Python 和 SQL。但往往你需要依赖于很多第三方的库,在Zeppelin里,你有下面3种方式来引入第三方依赖库。
flink.execution.jars 所有的jar包都会load到flink interpreter的classpath里,而且会被发送到Task Manager。这个配置主要是用来指定你的flink job所依赖的普通jar包.
flink.udf.jars 这个配置和flink.execution.jars非常像, 不同的地方在于Zeppelin会检测这些jar包中所包含的UDF class,而且会把他们注册到TableEnvironment中。UDF的名字就是这个class name。
flink.execution.packages 这个配置也类似flink.execution.jars,但它不是用来指定jar包,而是用来指定package的。Zeppelin会下载这个package以及这个package的依赖,并且放到flink interpreter的classpath上。比如你想使用kafka connector,那么你需要如下配置 flink.exection.packages成下面的样子
Job Concurrency & Parallelism
在Flink的sql-client里你只能一条接一条的运行Sql,但是在Zeppelin里你可以同时并发跑多个sql job。默认情况下,你可以同时跑10个batch sql job,10个 streaming sql job。如果10对你来说还不够,你可以通过设置下面2个参数进行调节。
zeppelin.flink.concurrentBatchSql.max
zeppelin.flink.concurrentStreamSql.max
除了指定flink的job并发度,你还能通过设置paragraph的local property来指定每个job的并行度。下图是一个简单的例子,在这个例子里我设置了parallelism 为3,那么这个job中涉及到shuffle的过程就会有3个task并行跑。
Sql Job Configuration
在Zeppelin里,你不仅可以配置flink session cluster,也可以针对每个sql job做配置。你可以在这里[1]找到所有的sql job的配置选项。在每个sql paragraph 中(%flink.bsql & %flink.ssql), 你都可以用set语句来进行配置。下面有个简单的例子,在这个例子中我会设置table.exec.window-agg.buffer-size-limit,有一点需要注意的是每个paragraph中的set语句只会影响这个paragraph所对应的flink job。也就是在其他paragraph中的flink job仍然会使用table.exec.window-agg.buffer-size-limit的默认值。
Run multiple insert sql as one flink job
在Flink的sql-client里,你只能一条insert语句跑完接着跑另外一条insert语句。默认情况下Zeppelin也会是这样的行为,但是有时候用户会希望多条insert语句跑在一个flink job里。在Zeppelin中,你只需要做一个简单的配置就能达到这种效果:给当前的paragraph设置local property runAsOne为true。下面就是一个简单的例子。
如果我run这个paragraph,那么就会启动一个flink job,这个flink job会同时run这2条insert sql语句。
Job Isolation
Job isolation对于streaming job来说非常重要。你不会希望一个flink job影响另外一个flink job。但是如果你尝试过之前的Flink on Zeppelin Batch篇,Streaming篇,你会发现这些flink jobs都是共享一个flink session cluster,也就是意味着一个job的失败可能会导致其他job的失败。很幸运的是在Zeppelin里我们只需要做一个小小的配置改动就能达到job的isolation。
我们只要将interpreter设置成per note isolated。这个配置意味着每个note都会有一个独立的flink session cluster。
Keep job alive even when Zeppelin shutdown
除了flink job之间的isolation,你也不希望zeppelin本身影响到你的flink job。即使zeppelin挂了,你也希望你之前提交的job仍然能继续执行。不过默认情况下,如果Zeppelin挂了,zeppelin上所提交的flink job都会被cancel掉。很幸运的是,你只要在配置上稍作修改就能达到zeppelin和flink job 相互隔离的目的。
设置 zeppelin.interpreter.close.cancel_job 为false
设置 flink.interpreter.close.shutdown_cluster 为false
Multiple Hadoop & Hive
有些情况下用户会有多个hadoop集群,比如一个是生产集群,一个是测试开发集群。你不需要为每个集群搭建一个Zeppelin,同一个Zeppelin可以连接多个hadoop集群。你只要创建多个flink interpreter,然后指定相对应的HADOOP_CONF_DIR 和 HIVE_CONF_DIR 就可以了。比如下面的截图里我创建了一个叫 flink_dev的flink interpreter,然后指定它的HADOOP_CONF_DIR 和 HIVE_CONF_DIR。在notebook里你只要用%flink_dev, %flink_dev.bsql, %flink_dev.ssql 就可以了。
Inline Configuration
需要注意的是这个inline configuraiton的paragraph需要在flink session cluster创建之前执行。所以一般情况下这个paragraph都是一个note的第一个paragraph。
Zeppelin Rest API
除了通过Zeppelin web前端用交互式的方式提交flink job之外,你也可以通过zeppelin的rest api来提交job。Zeppelin提供2种方式来提交job,同步阻塞式提交job和异步提交job。
有关rest api的具体使用,请参考这个链接[2]
How to debug/diagnose
很多时候,开发一个flink job总不是一帆风顺,总会遇到各种问题。我会建议你按照下面4个步骤来查问题。
Step 1. 查看Zeppelin前端显示的错误信息
Step 2. 如果是flink job 失败的问题,请查看flink web ui
Step 3. 如果step 1和2都没能发现线索,请查看flink interpreter log。flink interpreter log是在 zeppelin下的logs文件夹里的zeppelin-interpreter-flink-*.log
Step 4. 查看flink cluster的log,如果是yarn模式,查看yarn app log。
如果按照上面4个步骤来查问题,基本上大部分的问题都能找到线索,不过有一点我需要强调的是看log要仔细,不要只是简单扫一下。如果你第一遍按照这4个步骤还是没能发现线索,那么请第二遍再看一次,好好读读log,而不是简单看看。
References
[1].https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/config.html
[3]. http://zeppelin.apache.org/
[4]. https://flink.apache.org/
Zeppelin on Flink (3) Streaming 篇
如果有碰到任何问题,请加入下面这个钉钉群讨论。
Apache Zeppelin 公众号
版权声明: 本文为 InfoQ 作者【章剑锋_Jeff】的原创文章。
原文链接:【http://xie.infoq.cn/article/94246b90e3ffc0e2c1dc75990】。文章转载请联系作者。
评论