写点什么

Flink 实践教程 - 入门(7):消费 Kafka 数据写入 PG

  • 2021 年 11 月 14 日
  • 本文字数:2618 字

    阅读完需:约 9 分钟

Flink 实践教程-入门(7):消费 Kafka 数据写入 PG

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。


本文将您详细介绍如何利用 Python 脚本发送模拟数据到 CKafka 中,之后取 CKakfa 的数据经过简单的算术函数转换存入到 PostgreSQL 中。


操作视频

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。

创建 Topic:

进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。

数据准备:

  • Kafka 客户端:

  • 进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。

  • 使用脚本发送:

  • Java:参考 使用 SDK 收发消息 [7]

  • Python:参考如下代码


#!/usr/bin/python3# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块import jsonimport randomimport timefrom kafka import KafkaProducer
broker_lists = ['10.0.0.29:9092']topic_oceanus_quickstart = 'oceanus7_test1'
producer = KafkaProducer(bootstrap_servers=broker_lists, value_serializer=lambda m: json.dumps(m).encode('ascii'))
def generate_oceanus_test_data(): results = [] for _ in range(0, 10): int_one = random.randint(1000,10000) int_two = random.randint(1,10) random_thr = random.random() msg_kv = {"int_one":int_one,"int_two":int_two,"random_thr":random_thr} results.append(msg_kv) return results
def send_data(topic, msgs): for msg in msgs: import time time.sleep(1) producer.send(topic, msg) print(msg) producer.flush()
if __name__ == '__main__': count = 1 while True: msg_oceanus_test_data = generate_oceanus_test_data() send_data(topic_oceanus_quickstart, msg_oceanus_test_data) time.sleep(30)
复制代码

创建 PostgreSQL 实例

进入 PostgreSQL 控制台 [8],点击左上角【新建】创建实例,具体参考 创建 PostgreSQL 实例 [9]。进入实例数据库,创建 oceanus7_test1 表。


-- 建表语句create table public.oceanus7_test1 (  id            INT,  random_thr    DOUBLE PRECISION,  PRIMARY KEY(id));
复制代码


笔者这里使用 DBeaver 进行外网连接,更多连接方式参考官网文档 连接 PostgreSQL 实例 [10]

流计算 Oceanus 作业

1. 创建 Source

CREATE TABLE `kafka_json_source_table` (    int_one      INT,    int_two      INT,    random_thr   DOUBLE) WITH (  'connector' = 'kafka',  'topic' = 'oceanus7_test1',                -- 替换为您要消费的 Topic  'scan.startup.mode' = 'earliest-offset',   -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址  'properties.group.id' = 'oceanus_group2',  -- 必选参数, 一定要指定 Group ID  -- 定义数据格式 (JSON 格式)  'format' = 'json',  'json.fail-on-missing-field' = 'false',    -- 如果设置为 false, 则遇到缺失字段不会报错。  'json.ignore-parse-errors' = 'true'        -- 如果设置为 true,则忽略任何解析报错。);
复制代码

2. 创建 Sink

CREATE TABLE jdbc_sink (  id            INT,  random_thr    DOUBLE,  PRIMARY KEY (id) NOT ENFORCED) WITH (  'connector' = 'jdbc',             -- connector 类型为'jdbc'  'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true',      -- 请替换为您的实际 PostgreSQL 连接参数  'table-name' = 'oceanus7_test1',  -- 需要写入的数据表  'username' = 'root',              -- 数据库用户名(需要提供 INSERT 权限)  'password' = 'Tencent123$',       -- 数据库密码  -- 数据目的 Sink 性能调优参数  'sink.buffer-flush.max-rows' = '5000',  -- 可选参数, 表示每批数据的最大缓存条数, 默认值是 5000  'sink.buffer-flush.interval' = '2s',    -- 可选参数, 表示每批数据的刷新周期, 默认值是 0s  'sink.max-retries' = '3'                -- 可选参数, 表示数据库写入出错时, 最多重试的次数);
复制代码

3. 编写业务 SQL

INSERT INTO jdbc_sinkSELECT  MOD(int_one,int_two)    AS id,  TRUNCATE(random_thr,2)  AS random_thrFROM kafka_json_source_table;
复制代码

总结

本例使用 Python 自动化脚本模拟数据输入到 CKafka,经过简单的算术函数转换后存入 PostgreSQL 中。更多算术函数请参考 算数函数 [11]。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview


[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298


[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1


[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839


[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854


[6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840


[7] 使用 SDK 收发消息:https://cloud.tencent.com/document/product/597/54834


[8] PostgreSQL 控制台:https://console.cloud.tencent.com/postgres/index


[9] 创建 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/56961


[10] 连接 PostgreSQL 实例:https://cloud.tencent.com/document/product/409/40429


[11] 算术函数:https://cloud.tencent.com/document/product/849/18080



关注“腾讯云大数据”公众号,技术交流、最新活动、服务专享一站 Get~


流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓


用户头像

还未添加个人签名 2020.06.19 加入

欢迎关注,邀您一起探索数据的无限潜能!

评论

发布
暂无评论
Flink 实践教程-入门(7):消费 Kafka 数据写入 PG