写点什么

大数据 -148 Flink 写入 Kudu 实战:自定义 Sink 全流程(Flink 1.11/Kudu 1.17/Java 11)

作者:武子康
  • 2025-11-10
    山东
  • 本文字数:5474 字

    阅读完需:约 18 分钟

大数据-148 Flink 写入 Kudu 实战:自定义 Sink 全流程(Flink 1.11/Kudu 1.17/Java 11)

TL;DR

  • 场景:用 Flink DataStream 将用户数据实时落地到 Apache Kudu 表,走自定义 RichSinkFunction。

  • 结论:示例能跑通,但存在「逐列 apply」「异步错误未收集」「主键冲突」等工程隐患,需小改。

  • 产出:可运行样例 + 版本矩阵 + 错误速查卡。


版本矩阵


实现思路

将数据从 Flink 下沉到 Kudu 的基本思路如下:


  • 环境准备:确保 Flink 和 Kudu 环境正常运行,并配置好相关依赖。

  • 创建 Kudu 表:在 Kudu 中定义要存储的数据表,包括主键和列类型。

  • 数据流设计:使用 Flink 的 DataStream API 读取输入数据流,进行必要的数据处理和转换。

  • 写入 Kudu:通过 Kudu 的连接器将处理后的数据写入 Kudu 表。需要配置 Kudu 客户端和表的相关信息。

  • 执行作业:启动 Flink 作业,实时将数据流中的数据写入 Kudu,便于后续查询和分析。

添加依赖

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId> <artifactId>flink-test</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> <flink.version>1.11.1</flink.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.17.0</version> </dependency>
</dependencies></project>
复制代码

数据源

new UserInfo("001", "Jack", 18),new UserInfo("002", "Rose", 20),new UserInfo("003", "Cris", 22),new UserInfo("004", "Lily", 19),new UserInfo("005", "Lucy", 21),new UserInfo("006", "Json", 24),
复制代码

自定义下沉器

package icu.wzk.kudu;public class MyFlinkSinkToKudu extends RichSinkFunction<Map<String, Object>> {
private final static Logger logger = Logger.getLogger("MyFlinkSinkToKudu");
private KuduClient kuduClient; private KuduTable kuduTable;
private String kuduMasterAddr; private String tableName; private Schema schema; private KuduSession kuduSession; private ByteArrayOutputStream out; private ObjectOutputStream os;
public MyFlinkSinkToKudu(String kuduMasterAddr, String tableName) { this.kuduMasterAddr = kuduMasterAddr; this.tableName = tableName; }
@Override public void open(Configuration parameters) throws Exception { out = new ByteArrayOutputStream(); os = new ObjectOutputStream(out); kuduClient = new KuduClient.KuduClientBuilder(kuduMasterAddr).build(); kuduTable = kuduClient.openTable(tableName); schema = kuduTable.getSchema(); kuduSession = kuduClient.newSession(); kuduSession.setFlushMode(KuduSession.FlushMode.AUTO_FLUSH_BACKGROUND); }
@Override public void invoke(Map<String, Object> map, Context context) throws Exception { if (null == map) { return; } try { int columnCount = schema.getColumnCount(); Insert insert = kuduTable.newInsert(); PartialRow row = insert.getRow(); for (int i = 0; i < columnCount; i ++) { Object value = map.get(schema.getColumnByIndex(i).getName()); insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value); OperationResponse response = kuduSession.apply(insert); if (null != response) { logger.error(response.getRowError().toString()); } } } catch (Exception e) { logger.error(e); } }
@Override public void close() throws Exception { try { kuduSession.close(); kuduClient.close(); os.close(); out.close(); } catch (Exception e) { logger.error(e); } }
private void insertData(PartialRow row, Type type, String columnName, Object value) { try { switch (type) { case STRING: row.addString(columnName, value.toString()); return; case INT32: row.addInt(columnName, Integer.valueOf(value.toString())); return; case INT64: row.addLong(columnName, Long.valueOf(value.toString())); return; case DOUBLE: row.addDouble(columnName, Double.valueOf(value.toString())); return; case BOOL: row.addBoolean(columnName, Boolean.valueOf(value.toString())); return; case BINARY: os.writeObject(value); row.addBinary(columnName, out.toByteArray()); return; case FLOAT: row.addFloat(columnName, Float.valueOf(value.toString())); default: throw new UnsupportedOperationException("Unknown Type: " + type); }
} catch (Exception e) { logger.error("插入数据异常: " + e); } }}
复制代码

编写实体

package icu.wzk.kudu;
public class UserInfo {
private String id;
private String name;
private Integer age;
// 省略构造方法、Get、Set 接口}
复制代码

执行建表

package icu.wzk.kudu;public class KuduCreateTable {
public static void main(String[] args) throws KuduException { String masterAddress = "localhost:7051,localhost:7151,localhost:7251"; KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(masterAddress); KuduClient kuduClient = kuduClientBuilder.build();
String tableName = "user"; List<ColumnSchema> columnSchemas = new ArrayList<>(); ColumnSchema id = new ColumnSchema .ColumnSchemaBuilder("id", Type.INT32) .key(true) .build(); columnSchemas.add(id); ColumnSchema name = new ColumnSchema .ColumnSchemaBuilder("name", Type.STRING) .key(false) .build(); columnSchemas.add(name); ColumnSchema age = new ColumnSchema .ColumnSchemaBuilder("age", Type.INT32) .key(false) .build(); columnSchemas.add(age);
Schema schema = new Schema(columnSchemas); CreateTableOptions options = new CreateTableOptions(); // 副本数量为1 options.setNumReplicas(1); List<String> colrule = new ArrayList<>(); colrule.add("id"); options.addHashPartitions(colrule, 3);
kuduClient.createTable(tableName, schema, options); kuduClient.close(); }
}
复制代码

主逻辑代码

package icu.wzk.kudu;public class SinkToKuduTest {
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<UserInfo> dataSource = env.fromElements( new UserInfo("001", "Jack", 18), new UserInfo("002", "Rose", 20), new UserInfo("003", "Cris", 22), new UserInfo("004", "Lily", 19), new UserInfo("005", "Lucy", 21), new UserInfo("006", "Json", 24) ); SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource .map(new MapFunction<UserInfo, Map<String, Object>>() { @Override public Map<String, Object> map(UserInfo value) throws Exception { Map<String, Object> map = new HashMap<>(); map.put("id", value.getId()); map.put("name", value.getName()); map.put("age", value.getAge()); return map; } });
String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251"; String tableInfo = "user"; mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));
env.execute("SinkToKuduTest"); }
}
复制代码

解释分析

环境设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();:初始化 Flink 的执行环境,这是 Flink 应用的入口。

数据源创建

DataStreamSource<UserInfo> dataSource = env.fromElements(...):创建了一个包含多个 UserInfo 对象的数据源,模拟了一个输入流。

数据转换

SingleOutputStreamOperator<Map<String, Object>> mapSource = dataSource.map(...):使用 map 函数将 UserInfo 对象转换为 Map<String, Object>,便于后续处理和写入 Kudu。每个 UserInfo 的属性都被放入一个 HashMap 中。

Kudu 配置信息

String kuduMasterAddr = "localhost:7051,localhost:7151,localhost:7251"; 和 String tableInfo = "user";:定义 Kudu 的主节点地址和目标表的信息。

数据下沉

mapSource.addSink(new MyFlinkSinkToKudu(kuduMasterAddr, tableInfo));:将转换后的数据流添加到 Kudu 的自定义 Sink 中。MyFlinkSinkToKudu 类应该实现了将数据写入 Kudu 的逻辑。

执行作业

env.execute("SinkToKuduTest");:启动 Flink 作业,执行整个数据流处理流程。

测试运行

  • 先运行建表

  • 再运行主逻辑


我们建表之后,确认 user 表存在。然后我们运行 Flink 程序,将数据写入 Kudu。



确认有表后,执行 Flink 程序:


注意事项

  • 并发性:根据 Kudu 集群的规模和配置,可以调整 Flink 作业的并发性,以提高写入性能。

  • 批量写入:Kudu 支持批量插入,可以通过适当配置 Flink 的 sink 来提高性能。

  • 故障处理:确保在作业中处理异常和重试逻辑,以确保数据不会丢失。

  • 监控与调试:使用 Flink 的监控工具和 Kudu 的工具(如 Kudu UI)来监控数据流和性能。

错误速查

其他系列

🚀 AI 篇持续更新中(长期更新)

AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!AI-调查研究-108-具身智能 机器人模型训练全流程详解:从预训练到强化学习与人类反馈🔗 AI模块直达链接

💻 Java 篇持续更新中(长期更新)

Java-154 深入浅出 MongoDB 用 Java 访问 MongoDB 数据库 从环境搭建到 CRUD 完整示例 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!🔗 Java模块直达链接

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解🔗 大数据模块直达链接

发布于: 3 小时前阅读数: 12
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-148 Flink 写入 Kudu 实战:自定义 Sink 全流程(Flink 1.11/Kudu 1.17/Java 11)_Java_武子康_InfoQ写作社区