写点什么

提交 Flink 作业及所见问题总结

  • 2023-06-25
    江苏
  • 本文字数:3104 字

    阅读完需:约 10 分钟

提交Flink作业及所见问题总结

一、提交作业

1、执行命令

./bin/flink run [options] <job-jar> <arguments>
复制代码


可以使用 flink run --help 用来查看更多命令

2、示例

2.1、不带参数:


./bin/flink run -c com.yclxiao.cdc.Cdc2DorisSQLDemoWithCheckpoint ./flinkdemo-1.0-SNAPSHOT.jar 
复制代码


2.2、带参数:


每一个-代表一个参数,后面跟的是


./bin/flink run -c com.yclxiao.cdc.Cdc2DorisSQLDemoWithCheckpoint ./flinkdemo-1.0-SNAPSHOT.jar -name ycl -age 11
复制代码


解析的时候直接使用 flink 自带类去解析


ParameterTool params = ParameterTool.fromArgs(args);
复制代码


2.3、从 checkpoint 提交


增加了参数:


-s /Users/yclxiao/Project/bigdata/flinkdemo/checkpoints/143ac6febfce4274d24bdff6ec83d1c8/chk-170
复制代码


完整命令:


./bin/flink run -c com.yclxiao.cdc.Cdc2DorisSQLDemoWithCheckpoint -s /Users/yclxiao/Project/bigdata/flinkdemo/checkpoints/143ac6febfce4274d24bdff6ec83d1c8/chk-170 ./flinkdemo-1.0-SNAPSHOT.jar -name ycl -age 11
复制代码

二、提交作业碰到的问题

先把碰到的问题做个总结,再做详细解说

1、总结

先把碰到的问题总结一下:


  1. 资源不够的问题。解决方式:调整集群配置文件。

  2. 打包时,META-INF 下面的 SPI 没打进去的问题。解决方式:在 pom.xml 中增加 maven 插件。

  3. pom 的依赖配置问题,在 FlinkSQL 场景下会跟集群里的 lib 包有重复的冲突。解决方式:有些依赖打包时候无需打进去,在 flink 集群的 lib 目录下存在的 jar 包,则在打包作业 jar 时,无需打进去。

  4. 公有云上的特殊情况

2、详细解说

2.1、资源不够的问题:


错误描述:


2023-06-19 15:42:24,452 WARN  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Could not fulfill resource requirements of job 032dc3ac91b6128aaedae625b36e0575. Free slots: 02023-06-19 15:42:24,452 WARN  org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge [] - Could not acquire the minimum required resources, failing slot requests. Acquired: [ResourceRequirement{resourceProfile=ResourceProfile{cpuCores=1, taskHeapMemory=2.283gb (2451151214 bytes), taskOffHeapMemory=0 bytes, managedMemory=2.026gb (2175669399 bytes), networkMemory=518.720mb (543917349 bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 1, registered slots: 1 free slots: 02023-06-19 15:42:24,454 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: acct_profit[1] -> (Calc[2], Calc[9]) (1/1) (2ff961f809129e63bb6b9b164dd56ca4) switched from SCHEDULED to FAILED on [unassigned resource].org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

2023-06-19 15:42:23,440 WARN org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Could not fulfill resource requirements of job 032dc3ac91b6128aaedae625b36e0575. Free slots: 02023-06-19 15:42:23,440 WARN org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge [] - Could not acquire the minimum required resources, failing slot requests. Acquired: [ResourceRequirement{resourceProfile=ResourceProfile{cpuCores=1, taskHeapMemory=2.283gb (2451151214 bytes), taskOffHeapMemory=0 bytes, managedMemory=2.026gb (2175669399 bytes), networkMemory=518.720mb (543917349 bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered TMs: 1, registered slots: 1 free slots: 02023-06-19 15:42:23,441 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: acct_profit[1] -> (Calc[2], Calc[9]) (1/1) (f35db0e3c29aef8507f6d6f7d19e4e90) switched from SCHEDULED to FAILED on [unassigned resource].
复制代码


可能内存设置小了、可能并发分配小了、可能是 slot 设置小了,参考配置flink-conf.yaml


jobmanager.memory.process.size: 2600mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mtaskmanager.numberOfTaskSlots: 10parallelism.default: 4
复制代码


2.2、找不到 mysql-cdc 的问题


是因为打包的时候没有把所有包的 meta-inf 合并打包到一起,需要在 pom.xml 中增加配置:


<plugin>    <groupId>org.apache.maven.plugins</groupId>    <artifactId>maven-shade-plugin</artifactId>    <version>3.2.4</version>    <configuration>        <filters>            <filter>                <artifact>*:*</artifact>                <excludes>                    <exclude>META-INF/*.SF</exclude>                    <exclude>META-INF/*.DSA</exclude>                    <exclude>META-INF/*.RSA</exclude>                </excludes>            </filter>        </filters>    </configuration>    <executions>        <execution>            <phase>package</phase>            <goals>                <goal>shade</goal>            </goals>            <configuration>                <transformers>                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">                        <mainClass>com.bm001.datacompute.cdc.api.CloudAcctProfit2DwsHdjProfitRecordAPI</mainClass>                    </transformer>                </transformers>            </configuration>        </execution>    </executions></plugin>
复制代码


可以参考如下文章:


https://wii.pub/2021/08/23/tools/maven/problems/merge-meta-info/


https://blog.csdn.net/RL_LEEE/article/details/128134800


2.3、jar 包重复的问题


有时候本地开发和运行时需要某个 jar 包,但是丢到集群去执行时不需要这个 jar 包。因为集群的 lib 中已经存在此 jar 包。此时会报类似的错误:


Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'default' that implement 'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath.
复制代码


解决方法:打包时,需要将 pom 的 scope 改成 provided


<dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-table-runtime</artifactId>    <version>${flink.version}</version>    <scope>provided</scope></dependency>
复制代码

3、提交到公有云上出现的问题

在本地运行 OK,提交到测试服务器也是运行 OK,但是丢到公有云的 ECS 机器上可能出现一些问题。


3.1、无效的参数 0.0.0.0:8081


无效的参数 0.0.0.0 问题,应该是 netty 访问 0.0.0.0 被限制了,应该是云上自己限制的,测试环境没这个问题,后来改成配置本机 ip 地址就好了。


rest.address: xx.xx.xx.xxrest.bind-address: xx.xx.xx.xx
复制代码


3.2、需要修改 tmp 临时文件的地址,否则会占用系统盘的控件


io.tmp.dirs: /data/software/flink-15.4/tmp
复制代码


3.3、云上数据库用户权限不够


到云上控制台修改用户权限


Caused by: java.sql.SQLSyntaxErrorException: Access denied; you need (at least one of) the RELOAD privilege(s) for this operation
复制代码


用户头像

http://www.mangod.top/s/aboutme 2018-09-11 加入

码农、架构师,混迹IT领域12余年,热爱技术,热爱生活

评论

发布
暂无评论
提交Flink作业及所见问题总结_flink_不焦躁的程序员_InfoQ写作社区