写点什么

三步玩转:如何通过 Flink OceanBase CDC 连接器快速查询数据

  • 2022-10-14
    浙江
  • 本文字数:4226 字

    阅读完需:约 1 分钟

CDC (Change Data Capture,即变更数据捕获)能够帮助您监测并捕获数据库的变动。包括数据或数据表的插入、更新和删除等。CDC 将这些变更按发生的顺序完整记录下来,写入到消息中间件中,以供其他服务进行订阅及消费。您可用 CDC 提供的数据做历史库、近实时缓存、提供给消息队列(MQ),用户消费 MQ 做分析和审计等。


Flink CDC (CDC Connectors for Apache Flink) 是 Apache Flink 的一组 Source 连接器,它支持从大多数据库中实时地读取存量历史数据和增量变更数据。 Flink CDC 能够将数据库的全量和增量数据同步到消息队列和数据仓库中。Flink CDC 也可以用于实时数据集成,您可以使用它将数据库数据实时导入数据湖或者数据仓库。同时,Flink CDC 还支持数据加工,您可以通过它的 SQL Client 对数据库数据做实时关联、打宽、聚合,并将结果写入到各种存储中。借助 Flink OceanBase CDC 连接器,您可以使用 SQL DDL 创建一个 CDC 源来监控单个表的变化。您也可以在没有部署 Debezium 和 Apache Kafka 的情况下,在一个作业中消费多个数据库和表的变化。在没有 Flink OceanBase CDC 连接器之前,您不能做实时数据的多流 JOIN。


OceanBase 数据库的 CDC 组件主要有:obcdc(原 liboblog)、oblogmsg、oblogproxy 和 oblogclient。各组件功能见下表:



本文介绍如何使用 Flink OceanBase CDC 连接器。本文将以 Elasticsearch 为目标数据源,演示如何将数据从 OceanBase 数据库迁移至 Elasticsearch。

特性

At-Least-Once 处理

OceanBase CDC 连接器是一个 Flink Source 连接器。它将首先读取数据库快照,然后再读取变化事件,并进行 At-Least-Once 处理


OceanBase 数据库是一个分布式数据库,它的日志也分散在不同的服务器上。由于没有类似 MySQL binlog 偏移量的位置信息,OceanBase 数据库用时间戳作为位置标记。为确保读取完整的数据,liboblog(读取 OceanBase 日志记录的 C++ 库)可能会在给定的时间戳之前读取一些日志数据。因此,OceanBase 数据库可能会读到起始点附近时间戳的重复数据,从而保证了 At-Least-Once 处理

启动模式

配置选项 scan.startup.mode 指定 OceanBase CDC 连接器的启动模式。可用取值包括:


  • initial(默认):在首次启动时对受监视的数据库表执行初始快照,并继续读取最新的事务日志。

  • latest-offset:首次启动时,不对受监视的数据库表执行快照,仅从连接器启动时读取事务日志。

  • timestamp:在首次启动时不对受监视的数据库表执行初始快照,仅从指定的 scan.startup.timestamp 读取事务日志。

消费事务日志

OceanBase CDC 连接器使用 oblogclient 消费 oblogproxy 中的事务日志。

DataStream Source

OceanBase CDC 连接器也可以作为 DataStream Source 使用。您可以创建一个 SourceFunction,例如:


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory;import com.ververica.cdc.connectors.oceanbase.table.StartupMode;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class OceanBaseSourceExample {
public static void main(String[] args) throws Exception { SourceFunction<String> oceanBaseSource = OceanBaseSource.<String>builder() .rsList("127.0.0.1:2882:2881") // set root server list .startupMode(StartupMode.INITIAL) // set startup mode .username("user@test_tenant") // set cluster username .password("pswd") // set cluster password .tenantName("test_tenant") // set captured tenant name, do not support regex .databaseName("test_db") // set captured database, support regex .tableName("test_table") // set captured table, support regex .hostname("127.0.0.1") // set hostname of OceanBase server or proxy .port(2881) // set the sql port for OceanBase server or proxy .logProxyHost("127.0.0.1") // set the hostname of log proxy .logProxyPort(2983) // set the port of log proxy .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint env.enableCheckpointing(3000); env.addSource(oceanBaseSource).print().setParallelism(1);
env.execute("Print OceanBase Snapshot + Commit Log"); }}
复制代码

数据类型映射

当启动模式不是 INITIAL 时,连接器无法获得一个列的精度和比例。为兼容不同的启动模式,连接器不会将一个不同精度的 OceanBase 类型映射到不同的 FLink 类型。例如,BOOLEANTINYINT(1)BIT(1) 均会转换成 BOOLEAN。在 OceanBase 数据库中,BOOLEAN 等同于 TINYINT(1),所以 BOOLEANTINYINT 类型的列在 Flink 中会被映射为 TINYINT,而 BIT(1) 在 Flink 中会被映射为 BINARY(1)


体验 Flink OceanBase CDC 连接器

前提条件

在迁移 OceanBase 的数据之前,您需要确认以下信息:



OceanBase CDC 连接器支持从 OceanBase 数据库读取快照数据和增量数据。本节介绍如何设置 OceanBase CDC 连接器,以在 OceanBase 数据库中查询数据。

依赖

要使用 OceanBase CDC 连接器,您必须提供相关的依赖信息。以下依赖信息适用于使用自动构建工具(如 Maven 或 SBT)构建的项目和带有 SQL JAR 包的 SQL 客户端。


<dependency>  <groupId>com.ververica</groupId>  <artifactId>flink-connector-oceanbase-cdc</artifactId>  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->  <version>2.2.1</version></dependency>
复制代码

下载 SQL 客户端 JAR 包

点击 flink-sql-connector-oceanbase-cdc-2.2.1.jar 下载 JAR 包至 <FLINK_HOME>/lib/.


说明:

下载链接仅适用于稳定发行版本。


flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 快照版本与开发分支的版本对应。要使用快照版本,您必须自行下载并编译源代码。推荐使用稳定发行版本,例如 flink-sql-connector-oceanbase-cdc-2.2.1.jar。您可以在 Maven 中央仓库中找到使用稳定发行版本。

配置 OceanBase 数据库和 oblogproxy 服务

  1. 按照 部署文档 配置 OceanBase 集群。

  2. 在 sys 租户中,为 oblogproxy 创建一个带密码的用户。更多信息,参考 用户管理文档


    mysql -h${host} -P${port} -uroot    mysql> SHOW TENANT;    mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}';    mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION;
复制代码


  1. 为你想要监控的租户创建一个用户,这个用户用来读取快照数据和变化事件数据。

  2. 获取 rootservice_listconfig-url 的值。如果您是社区版用户,使用以下命令:


    mysql> SHOW PARAMETERS LIKE 'rootservice_list';
复制代码


如果您是企业版用户,使用以下命令:
复制代码


    mysql> SHOW PARAMETERS LIKE 'obconfig_url';
复制代码


  1. 按照 oblogproxy 文档 配置 oblogproxy。

创建 OceanBase CDC 表

使用以下命令,创建 OceanBase CDC 表:


-- 每 3000 毫秒做一次 checkpoint                   Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- 在 Flink SQL 中创建 OceanBase 表 `orders`Flink SQL> CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN, PRIMARY KEY (order_id) NOT ENFORCED) WITH ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', 'username' = 'user@test_tenant', 'password' = 'pswd', 'tenant-name' = 'test_tenant', 'database-name' = 'test_db', 'table-name' = 'orders', 'hostname' = '127.0.0.1', 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', 'logproxy.host' = '127.0.0.1', 'logproxy.port' = '2983');
-- 从表 orders 中读取快照数据和 binlog 数据Flink SQL> SELECT * FROM orders;
复制代码


您也可以访问 Flink 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 Flink 官网文档

Flink OceanBase CDC 连接器配置项

支持的元数据

在创建表时,您可以使用以下格式的元数据作为只读列(VIRTUAL)。



如下 SQL 展示了如何在表中使用这些元数据列:


CREATE TABLE products (    tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,    db_name STRING METADATA FROM 'database_name' VIRTUAL,    table_name STRING METADATA  FROM 'table_name' VIRTUAL,    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,    order_id INT,    order_date TIMESTAMP(0),    customer_name STRING,    price DECIMAL(10, 5),    product_id INT,    order_status BOOLEAN,    PRIMARY KEY(order_id) NOT ENFORCED) WITH (   'connector' = 'oceanbase-cdc',   'scan.startup.mode' = 'initial',   'username' = 'user@test_tenant',   'password' = 'pswd',   'tenant-name' = 'test_tenant',   'database-name' = 'test_db',   'table-name' = 'orders',   'hostname' = '127.0.0.1',   'port' = '2881',   'rootserver-list' = '127.0.0.1:2882:2881',   'logproxy.host' = '127.0.0.1',   'logproxy.port' = '2983');
复制代码


用户头像

企业级原生分布式数据库 2020-05-06 加入

github:https://github.com/oceanbase/oceanbase 欢迎大家

评论

发布
暂无评论
三步玩转:如何通过Flink OceanBase CDC连接器快速查询数据_OceanBase 数据库_InfoQ写作社区