物化视图, Materialized View, 在数据管理系统中指将视图的查询和计算的结果保存为一个物理表,这样每次访问视图时,无需重新执行查询,从而提高了查询效率。物化视图针对一些需要做大量频繁的聚合计算,以及复杂关联的场景下,是一个非常行之有效的提高性能降低资源使用的数据架构模式。

(图片来源: https://blog.the-pans.com/caching-partially-materialized-views-consistently/



全量更新策略在每次更新时都会清除物化视图中现有的所有数据,并将最新的查询结果集重新插入。这个过程可以理解为执行了 TRUNCATE TABLE 和 INSERT INTO SELECT 的组合操作。全量更新虽然简单直接,但在大数据量或高频更新的场景下,其效率和资源消耗可能成为一个问题。





1. 金融交易系统中的余额更新



  • 交易完成后,用户能够实时看到余额变化。

  • 数据一致性要求高,不能有延迟。


  • 银行或股票交易平台在每次交易提交时,更新用户的账户余额物化视图。

2. 库存管理系统中的实时库存



  • 每次销售或退货时,库存信息需要立即更新。

  • 防止超卖,确保用户查询时显示的是准确的库存数据。


  • 电商平台在用户下单后,实时更新库存物化视图,确保前台用户和后台管理系统中的库存信息同步。

3. 实时监控和告警系统

在一些生产系统或 IT 监控平台中,监控指标(如 CPU 利用率、内存占用、网络流量等)会频繁变化。此类系统需要根据实时数据判断是否触发告警。因此,可以通过实时更新技术来随时更新指标的物化视图,以便立即发现异常情况并触发告警。


  • 需要对系统各项关键指标进行实时监控。

  • 任何异常都需要在最短时间内被发现,并触发相应的告警机制。


  • 运维监控平台,每次收集到新的监控数据时,刷新监控指标的物化视图,确保告警规则能够基于最新数据进行判断。

4. 客户关系管理(CRM)系统中的实时客户状态

在 CRM 系统中,客户的行为数据(如打电话、发邮件、订单记录等)经常发生变动。业务人员希望能够实时看到客户的最新互动记录、订单状态等,以便根据最新情况及时跟进客户。因此, 在每次客户数据更新时刷新物化视图,使得业务人员在查看客户详情时能够看到最新信息。


  • 业务人员在跟进客户时,必须基于最新的互动记录进行操作。

  • 任何客户状态变更都要实时反映,以便做出及时决策。


  • 每次客户下订单、发送邮件或进行其他行为时,CRM 系统会立即刷新相关的客户状态物化视图。

5. 实时推荐系统中的用户行为数据更新



  • 用户行为频繁,推荐结果需要实时调整。

  • 数据必须实时反映用户的最新兴趣和偏好。


  • 当用户点击商品或浏览内容时,刷新用户行为数据的物化视图,以便推荐系统实时调整推荐结果。



  1. 利用数据库提供的物化视图实时更新能力, 如 Oracle, PosgreSQL 等均提供相应的能力

  2. 使用支持 CDC 数据复制和流式计算的实时数据平台,如 Kafka, TapData 等




1. Oracle Database

Oracle 通过物化视图(Materialized View)以及物化视图日志(Materialized View Log)来支持基于事务提交的实时刷新。

  • 物化视图日志(Materialized View Log):Oracle 需要在源表上建立一个日志表,记录所有的插入、更新和删除操作。物化视图会根据日志来实时刷新数据。

  • 在创建物化视图时,可以使用 REFRESH FAST ON COMMIT 选项,这样物化视图会在事务提交时根据日志数据进行增量刷新。如:


2. PostgreSQL

PostgreSQL 可以通过触发器(Trigger)来模拟这一功能。在事务提交时,触发器可以用来更新物化视图。

  • 触发器:可以为源表创建 AFTER INSERTAFTER UPDATEAFTER DELETE 的触发器,确保当数据表发生变化时,自动执行刷新物化视图的操作。


  • 定时器(pg_cron):对于较为频繁的更新,也可以通过定时任务实现定期刷新。 CREATE TRIGGER refresh_mv_triggerAFTER INSERT OR UPDATE OR DELETEON source_tableFOR EACH STATEMENTEXECUTE FUNCTION refresh_materialized_view();

3. MySQL

MySQL 并不原生支持物化视图的概念,但可以通过 触发器表复制 来模拟物化视图功能,配合触发器实现类似 实时更新的效果。

  • 触发器:在源表上创建触发器,每当发生数据变更时更新对应的派生表,模拟物化视图刷新。

  • 复制表:创建一个冗余表,手动更新该表以反映源表中的变化。通过触发器自动进行更新。


  -- 手动更新物化视图逻辑


4. Snowflake

Snowflake 提供了一种称为 Materialized Views 的特性,可以为大规模数据集实现增量刷新。虽然 Snowflake 没有提供 On Commit Refresh 的功能,但它可以通过 自动刷新 实现接近实时的数据更新。

  • Materialized View:Snowflake 会自动检测源表的更改,并在需要时对物化视图进行增量刷新。刷新过程异步进行,因此在事务提交后会稍有延迟。


CREATE MATERIALIZED VIEW mv_exampleAS SELECT column1, COUNT(*)FROM source_table;

5. ClickHouse

ClickHouse 提供了一种基于 物化视图(Materialized Views) 的机制,能够实现对实时数据的近实时处理。通过依赖表自动触发物化视图的更新。

  • 物化视图(Materialized Views):ClickHouse 允许将数据表的实时更新映射到物化视图,使用 POPULATE 选项将源表的数据推送到物化视图。

  • 分布式流处理:ClickHouse 通过流处理机制对数据进行处理,适合对大量实时数据进行快速分析。

CREATE MATERIALIZED VIEW mv_exampleTO target_tableAS SELECT * FROM source_table;

6. BigQuery

Google BigQuery 支持 物化视图(Materialized View),这些视图并不会在每次数据更改时自动更新,但支持周期性刷新。对于部分业务需求,可以通过触发刷新机制,在数据提交时强制刷新物化视图,达到类似实时更新的效果。

  • 周期性刷新:BigQuery 支持每 30 分钟自动刷新物化视图,此外还可以通过编程接口(如 Google Cloud Functions)手动触发刷新。

CREATE MATERIALIZED VIEW mv_exampleAS SELECT column1, COUNT(*)FROM source_table;

我们可以看到,除了 Oracle 数据库提供了原生的基于事务级别实时更新视图能力之外,其他的都是通过触发器,或者定时自动刷新的方式来模拟。对实时要求比较高的场景,支持上并不理想。

另外,使用数据库自身能力也意味着你只能在数据库内部创建物化视图,对多源,跨库,读写分离,以及不希望给原库增加压力的场景,都无法使用这种模式。在这些时侯,我们需要使用一个支持 CDC 数据复制和流式计算的实时数据平台来实现

基于 CDC 数据复制和流式计算来实时更新物化视图


  • CDC 实时复制工具,这个是用来对源库的事务日志进行监听,解析,并第一时间交给计算框架去处理。开源的一般会用 Debezium, 商用的较常见的是 Oracle Golden Gate 等

  • 流式计算能力,能够对 CDC 传输过来的 Insert / Update / Delete 同步到目标视图里面,并且能够对多表的事件进行关联聚合等

我们以一个订单宽表为例子来说明这个实现方式。我们有一个 MySQL 的电商平台,我们希望提供一个包含完整信息的订单 API(如客户信息,商品信息,物流信息等)提供给客户的手机端来查询。由于 MySQL 的并发查询和关联查询性能有限,我们选择了在能够提供相对较高查询性能,并支持 JSON 结构(API 模型设计)的 MongoDB 里构建一个物化视图的方式来支持这个 API。

换句话来说,假设这个是 MySQL 数据库的表结构:

我们希望有这样的一个视图,可以直接用来给到客户端通过 order_id 或者 customer_id 来查询客户订单。这个 API JSON 的结构可能是下面这样,一个模型里包含了订单,客户地址,付款信息和订单明细。

{    "_id": ObjectId("66f7e633f72882271da1a2ec"),    "order_id": "0005a1a1728c9d785b8e2b08b904576c",    "customer_id": "16150771dfd4776261284213b89c304e",    "order_approved_at": "2018-03-20T18:35:21.000+00:00",    "order_delivered_carrier_date": "2018-03-28T00:37:42.000+00:00",    "order_delivered_customer_date": "2018-03-29T18:17:31.000+00:00",    "order_estimated_delivery_date": "2018-03-29T00:00:00.000+00:00",    "order_purchase_timestamp": "2018-03-19T18:40:33.000+00:00",    "order_status": "delivered"    customer_info: {       "customer_city": "santos",       "customer_id": "16150771dfd4776261284213b89c304e",       "customer_state": "SP",       "customer_unique_id": "639d23421f5517f69d0c3d6e6564cf0e",       "customer_zip_code_prefix": "11075"    },      order_items: [      {         "order_item_id": 1,         "price": 145.9499969482422,         "product_id": "310ae3c140ff94b03219ad0adc3c778f",         "order_id": "0005a1a1728c9d785b8e2b08b904576c",         "freight_value": 11.649999618530273,         "seller_id": "a416b6a846a11724393025641d4edd5e",         "shipping_limit_date": "2018-03-26T18:31:29.000+00:00",         "seller": {             "seller_city": "sao paulo",             "seller_id": "a416b6a846a11724393025641d4edd5e",             "seller_state": "SP",             "seller_zip_code_prefix": "03702"         },         "product": {             "product_category_name": "beleza_saude",             "product_description_lenght": 493,             "product_description_length": null,             "product_height_cm": 12,             "product_id": "310ae3c140ff94b03219ad0adc3c778f",             "product_length_cm": 30,             "product_name_lenght": 59,             "product_name_length": null,             "product_photos_qty": 1,             "product_weight_g": 2000,             "product_width_cm": 16         }      },      .........    ]    }

为了达到这个效果,我们需要将 订单表(ecom_orders)与 订单明细表 (ecom_order_items), 客户信息表(ecomm_customer,形成一个宽表(OrderView),并使用 MySQL Debezium Connector + Kafka Connect + kakfa broker + Kafka Streams 实现持续刷新,这里将会介绍一个完整的步骤来达成这一目标。



  • Docker (for Kafka, Zookeeper, Schema registry and Kafka Connect)

  • All docker images can be downloaded from https://hub.docker.com/

Step 1: Set up Kafka broker, Zookeeper, Schema registry, and Kafka Connect

name: kakfa-projectservices:  zookeeper:    image: bitnami/zookeeper    ports:      - '31000:31000'    environment:      ZOOKEEPER_CLIENT_PORT: 2181      ZOOKEEPER_TICK_TIME: 2000      KAFKA_JMX_HOSTNAME: "localhost"      ALLOW_ANONYMOUS_LOGIN: yes      KAFKA_JMX_PORT: 31000
kafka: # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,- # An important note about accessing Kafka from clients on other machines: # ----------------------------------------------------------------------- # # The config used here exposes port 9092 for _external_ connections to the broker # i.e. those from _outside_ the docker network. This could be from the host machine # running docker, or maybe further afield if you've got a more complicated setup. # If the latter is true, you will need to change the value 'localhost' in # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those # remote clients # # For connections _internal_ to the docker network, such as from other services # and components, use kafka:29092. # # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,- # image: bitnami/kafka ports: - '9092:9092' - '31001:31001' depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT # Define both listeners KAFKA_LISTENERS: PLAINTEXT://,PLAINTEXT_HOST:// # Match advertised listeners with the defined listeners KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100 CONFLUENT_METRICS_ENABLE: 'false' KAFKA_JMX_HOSTNAME: "localhost" KAFKA_JMX_PORT: 31001 schema-registry: image: confluentinc/cp-schema-registry ports: - '8081:8081' environment: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka:29092" SCHEMA_REGISTRY_HOST_NAME: "schema-registry" SCHEMA_REGISTRY_LISTENERS: ""
kafka-connect: image: confluentinc/cp-kafka-connect-base ports: - '8083:8083' - '31004:31004' environment: CONNECT_BOOTSTRAP_SERVERS: "kafka:29092" CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect" CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO" CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR" CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1" CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1" CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1" CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components KAFKA_JMX_HOSTNAME: "localhost" KAFKA_JMX_PORT: 31004 depends_on: - zookeeper - kafka - schema-registry

# run docker-compose.yml filedocker-compose up

Step 2 Install Debezuim Mysql Connector in Kakfa connect

  • Install the Debezium MySQL connector using confluent-hub-client inside the Kafka Connect container:

docker exec -it kafka-connect /usr/bin/confluent-hub install debezium/debezium-connector-mysql:latest --component-dir /usr/share/confluent-hub-components --no-prompt
  • Restart the Kafka Connect container after installation:

docker restart kafka-connect

Step 3 Deploy a Debezium MySQL Connector

  • Prepare a JSON configuration file for the MySQL source connector. Below is an example configuration (debezium-mysql.json):

{    "name": "mysql-connector",    "config": {        "connector.class": "io.debezium.connector.mysql.MySqlConnector",        "tasks.max": "1",        "database.hostname": "",        "database.port": "3306",        "database.user": "root",        "database.password": "YJ983g!",        "snapshot.mode": "initial",        "database.server.id": "184054",        "database.server.name": "dbserver1.DEMO",        "table.include.list": "ECommerce.ecom_customers,ECommerce.ecom_orders,ECommerce.ecom_order_items",        "database.history.kafka.bootstrap.servers": "kafka:29092",        "database.history.kafka.topic": "dbhistory.fullfillment",        "topic.prefix": "dbserver1",  // Add this line,        "database.history.kafka.schema.registry.url": "http://schema-registry:8081",        "schema.history.internal.kafka.bootstrap.servers": "kafka:29092",        "schema.history.internal.kafka.topic": "umer-test-history-topic"    }}
  • Deploy the connector by posting this configuration to the Kafka Connect REST API:

curl -X POST -H "Content-Type: application/json" --data @debezium-mysql.json http://localhost:8083/connectors
  • Verify the connector status to ensure it’s running:

curl http://localhost:8083/connectors/mysql-connector/status

If the connector status shows as RUNNING, your MySQL data is now streaming into the Kafka broker in real-time. Each change (insert/update/delete) made to the MySQL database will be captured by debezuim MySQL connector and sent to the Kafka broker.

At this stage, you have successfully set up real-time data streaming from MySQL to Kafka broker.

  • Verify kakfa topics in kakfa broker

docker exec -it kafka bashcd /opt/bitnami/kafka/bin./kafka-topics.sh --bootstrap-server localhost:9092 --list

Step 4 Real-Time Data Streaming from Kafka broker to MongoDB

Now that the data is streaming in real-time from MySQL to Kafka, you can consume this data and map it to MongoDB using a custom node.JS Script. This application uses the kafkajs streaming library to consume messages from Kafka topics and the mongodb library to store this data in MongoDB.

In this example, we have an eCommerce database with orders, order items and customer details. We consume this data from Kafka topics and enrich order with related customer information and order items before writing it to MongoDB.

const { Kafka } = require('kafkajs');const { MongoClient } = require('mongodb');
const kafka = new Kafka({ clientId: 'qa-01', brokers: [''], sasl: { mechanism: 'plain', username: 'tapdata', password: 'VVVIy676!', },});
const mongoUrl = 'mongodb://root:NTUOi37$!@';const mongoClient = new MongoClient(mongoUrl, { useUnifiedTopology: true });
let db;const orders = new Map(); // Map to temporarily store ordersconst customers = new Map(); // Map to store customer info
async function connectMongo() { try { await mongoClient.connect(); db = mongoClient.db('orderSingleView'); console.log("Successfully connected to MongoDB."); } catch (error) { console.error("Failed to connect to MongoDB:", error); }}
async function consumeEcomOrders() { const ecomOrdersConsumer = kafka.consumer({ groupId: 'qa_orders_x1_topic_group60' }); await ecomOrdersConsumer.connect(); await ecomOrdersConsumer.subscribe({ topic: 'dbserver1.ecom_orders_topic', fromBeginning: true }); await ecomOrdersConsumer.run({ eachMessage: async ({ message }) => { const ecomOrderData = JSON.parse(message.value.toString()); const orderId = ecomOrderData.order_id; // Adjusted to match your data structure const mqOp = message.headers.mqOp.toString();
if (mqOp === 'delete') { await db.collection('orderSingleView').deleteOne({ order_id: orderId }); orders.delete(orderId); // Remove from orders map console.log(`Deleted Order with ID: ${orderId}`); } else { // Store order data in the Map orders.set(orderId, { ...ecomOrderData }); console.log(`Inserted/Updated Order: ${JSON.stringify(ecomOrderData)}`);
// Insert the order into MongoDB await db.collection('orderSingleView').updateOne( { order_id: orderId }, { $set: { ...ecomOrderData } }, { upsert: true } );
// Check for customer info immediately after inserting the order await enrichOrderWithCustomerInfo(orderId, ecomOrderData.customer_id); } }, });}
async function consumeEcomCustomersDetails() { const customerConsumer = kafka.consumer({ groupId: 'qa_customers_x1_topic_group60' }); await customerConsumer.connect(); await customerConsumer.subscribe({ topic: 'dbserver1.ecom_customers_topic', fromBeginning: true }); await customerConsumer.run({ eachMessage: async ({ message }) => { const customerDetailData = JSON.parse(message.value.toString()); const customerId = customerDetailData.customer_id; // Adjusted to match your data structure const mqOp = message.headers.mqOp.toString();
if (mqOp === 'delete') { await db.collection('orderSingleView').updateMany( { 'customer_info.customer_id': customerId }, { $unset: { customer_info: "" } } ); console.log(`Deleted Customer Info for Customer ID: ${customerId}`); } else { // Store customer data in the Map customers.set(customerId, customerDetailData); console.log(`Inserted/Updated Customer Info for Customer ID: ${customerId}`);
// Enrich existing orders if customer_id matches await enrichExistingOrders(customerId); } }, });}
async function consumeOrderItems() { const orderItemsConsumer = kafka.consumer({ groupId: 'qa_order_items_topic_group60' }); await orderItemsConsumer.connect(); await orderItemsConsumer.subscribe({ topic: 'dbserver1.ecom_order_items_topic', fromBeginning: true }); await orderItemsConsumer.run({ eachMessage: async ({ message }) => { const orderItemData = JSON.parse(message.value.toString()); const orderId = orderItemData.order_id; // Get the order_id from order_item const mqOp = message.headers.mqOp.toString();
if (mqOp === 'delete') { // Remove the item from the order's items array in MongoDB await db.collection('orderSingleView').updateOne( { order_id: orderId }, { $pull: { order_items: { order_id: orderItemData.order_id} } } ); console.log(`Deleted Order Item with ID: ${orderItemData.item_id} from Order ID: ${orderId}`); } else { // Insert or update the item in the order's items array in MongoDB const result = await db.collection('orderSingleView').updateOne( { order_id: orderId }, { $addToSet: { order_items: orderItemData } }, // Use $addToSet to avoid duplicates { upsert: false } // Don't create a new document if not found );
if (result.modifiedCount === 0) { console.log(`No existing order found for Order ID: ${orderId}, inserting as new order item`); // Optionally, you could handle this case as needed } else { console.log(`Inserted/Updated Order Item for Order ID: ${orderId}`); } } }, });}
async function enrichExistingOrders(customerId) { for (const [orderId, order] of orders) { if (order.customer_id === customerId) { await enrichOrderWithCustomerInfo(orderId, customerId); } }}
async function enrichOrderWithCustomerInfo(orderId, customerId) { if (customers.has(customerId)) { const customerInfo = customers.get(customerId); try { await db.collection('orderSingleView').updateOne( { order_id: orderId }, { $set: { customer_info: customerInfo } } ); console.log(`Enriched Order with Customer Info for Order ID: ${orderId}`); } catch (error) { console.error(`Failed to enrich order ${orderId} with customer info: ${error.message}`); } } else { console.log(`No customer found for Customer ID: ${customerId}. Waiting for customer info...`); }}
(async () => { await connectMongo(); await consumeEcomOrders(); await consumeEcomCustomersDetails(); await consumeOrderItems(); // Start consuming order items})();


使用 Debezium MySQL 连接器与 Kafka Connect 相结合,可以方便地将变更数据捕获(CDC)传输到 Kafka 代理。通过 Node.js 中的 Kafka Streams 库,可以执行实时数据流处理和转换。此配置会从 MySQL 数据库中捕获更新,实时处理这些更新,并在将数据结果存储到 MongoDB 之前对其进行转换和映射。

Tap Flow,支持 CDC 的物化视图构建利器

什么是 Tap Flow

Tap Flow 是一个 TapData 实时数据平台提供的一个流式数据采集和处理的框架。开发者可以使用 Tap Flow 来实现实时数据复制,实时数据加工处理,多表流式合并,构建实时更新的物化视图等技术场景。

使用上面同样的例子,我们来看看用 Tap Flow 会是怎样的一个体验。

使用 Tap Flow 构建一个订单宽表


  1. 安装 Tap Flow 的 Python SDK 和 CLI

  2. 配置 TapData Cluster 连接信息

  3. 使用 Tap Flow 的命令和 API,构建 Flow,并设置目标为一个物化视图

  4. 运行 Flow


Step 1: 安装 Tap Shell, 一个 Tap Flow 的 Python SDK 和交互式命令行界面

# prerequisites: install python3 & pip3 before install tapshell# Install TapShell using Pipmaximus@Reid:~/home  pip3 install tapflow

Step 2: Start and Configure Tap Shell

  # Enter tapcli directory and Type tap and press enter button maximus@Reid:~/ tapMon Nov 4 12:34:48 CST 2024 Welcome to TapData Live Data Platform, Enjoy Your Data Trip !Tap Flow requires TapData Live Data Platform(LDP) cluster to run.If you would like to use with TapData Enterprise or TapData Community, type L to continue.If you would like to use TapData Cloud, or you are new to TapData, type C or press ENTER to continue.Please type L or C (L/[C]): CYou may obtain the keys by log onto TapData Cloud, and click: 'User Center' on the top right, then copy & paste the accesskey and secret key pair.Enter AK: xxxxxxxxxxxxxxxxxxxEnter SK: xxxxxxxxxxxxxxxxxxx Mon Oct 21 15:53:50 CST 2024 connecting remote server: https://cloud.tapdata.net ...Mon Oct 21 15:53:50 CST 2024 Welcome to TapData Live Data Platform, Enjoy Your Data Trip !========================================================================================================================TapData Cloud Service Running Agent: 1Agent name: agent-jk6453h (Machine), ip:, cpu usage: 40%
tap >
# If you're using TapData Enterprise then type L, please provide the server URL with port and access code, for example: && 123e4567-e89b-12d3-a456-426614174000. You can find the access code by logging into the TapData Enterprise platform, then navigating to Account SettingsMon Nov 4 12:34:48 CST 2024 Welcome to TapData Live Data Platform, Enjoy Your Data Trip !Tap Flow requires TapData Live Data Platform(LDP) cluster to run.If you would like to use with TapData Enterprise or TapData Community, type L to continue.If you would like to use TapData Cloud, or you are new to TapData, type C or press ENTER to continue.Please type L or C (L/[C]): LPlease enter server:port of TapData LDP server: enter access code: xxxxxxxxxxxxxxxxxxxxxxxxxx Mon Oct 21 11:26:55 CST 2024 connecting remote server: you ...Mon Oct 21 11:26:55 CST 2024 Welcome to TapData Live Data Platform, Enjoy Your Data Trip !
tap >

Step 3: Start Building Materialized View

  Step 3.1: Set Up Connection with Source databases.

# Connect with Source Database Mysqltap > mysqlJsonConfig = {                        'database': 'Demo',                         'port': 3306,                         'host': 'demo.tapdata.io',                         'username': 'demouser',                         'password': 'demopass'                        };                       tap > mysqlConn = DataSource('mysql', 'qa-mySqlEcommerceData', mysqlJsonConfig)                   .type('source')                   .save();datasource qa-mySqlEcommerceData creating, please wait...save datasource qa-mySqlEcommerceData success, will load schema, please wait...load schema status: finished

Upon successful signup, TapFlow automatically provisions a managed MongoDB Atlas instance for the user. This instance, referenced by the DEFAULT_SINK variable, serves as the destination for materialized views or tables created from source databases

  Step 3.2: Create data pipeline to build wide order data model

# Create the flow and set the base or master table "ecom_orders" tap> orderFlow = Flow("Order_SingleView_Sync").read_from("qa-mySqlEcommerceData.ecom_orders");Flow updated: source added
# Lookup and add the 'ecom_customers' table as an embedded document in 'orders' using customer_id as the association key.cIn MongoDB, path="customer_info", embeds it with the field name customer_info, and type="object", indicating it will be stored as an embedded document.tap> orderFlow.lookup("qa-mySqlEcommerceData.ecom_customers", path="customer_info", type="object", relation=[["customer_id","customer_id"]]); Flow updated: source addedFlow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3eb64d66e0> added as child table
# Lookup and add the 'ecom_order_payments' table as an embedded array in 'orders' using order_id as the association key. In MongoDB, path="orderPayments" embeds it with the field name order_payments, and type="array", indicating it will be stored as an embedded array.tap> orderFlow.lookup("qa-mySqlEcommerceData.ecom_order_payments", path="order_payments", type="array", relation=[["order_id","order_id"]]); Flow updated: source addedFlow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3eb6723e50> added as child table
# Lookup and add the 'ecom_order_items' table as an embedded array in 'orders' using order_id as the association key. In MongoDB, path="order_items," embeds it with the field name order_items, and type="array", indicating it will be stored as an embedded array.tap> orderFlow.lookup("qa-mySqlEcommerceData.ecom_order_items", path="order_items", type="array", relation=[["order_id","order_id"]]);
Flow updated: source addedFlow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3eb6864160> added as child table
# Lookup and add the 'ecom_products' table as embedded document in 'order_itmes' using product_id as association key. tap> orderFlow.lookup("qa-mySqlEcommerceData.ecom_products", path="order_items.product", type="object", relation=[["order_items.product_id","product_id"]]);
Flow updated: source addedFlow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3e4573e50> added as child table
# Lookup and add the 'ecom_sellers' table as embedded document in 'order_itmes' using seller_id as association key. tap>orderFlow.lookup("qa-mySqlEcommerceData.ecom_sellers", path="order_items.seller", type="object", relation=[["order_items.seller_id","seller_id"]]);
Flow updated: source addedFlow updated: new table <tapflow.lib.data_pipeline.nodes.source.Source object at 0x7f3y94853e50> added as child table
# Write data to target Monogdbtap> orderFlow.write_to(DEFAULT_SINK.orderSingleView);

  Step 3.3. Start Data pipeline

# Start data flowtap> orderFlow.start();Order_SingleView_Sync Pipeline is running

  Step 3.4. View the flow stats

# view Flow statstap> stats Order_SingleView_Sync

  Step 3.5. View Wide Order Data model in MongoDB

  Step 3.5. View Wide Order Data model in MongoDB 验证物化视图的实时更新效果

  • 运行脚本,观察 mysql 库的订单数据变动


Use ECommerceData;

select count(*) from ecom_orders eo;

  执行下述脚本在 ecom_orders table 里新增记录:

DELIMITER //CREATE PROCEDURE InsertRandomOrders()BEGIN    DECLARE i INT DEFAULT 0;    -- Disable foreign key checks    SET FOREIGN_KEY_CHECKS = 0;
WHILE i < 10 DO INSERT INTO ECommerceData.ecom_orders (order_id, customer_id, order_status, order_purchase_timestamp, order_approved_at, order_delivered_carrier_date, order_delivered_customer_date, order_estimated_delivery_date) VALUES ( CONCAT('ORD_I_', UUID()), -- Adds 'ORD_' before the randomly generated order_id UUID(), -- Generates a random customer_id CASE WHEN RAND() < 0.3 THEN 'delivered' WHEN RAND() < 0.6 THEN 'shipped' ELSE 'processing' END, -- Random order status NOW() - INTERVAL FLOOR(RAND() * 365) DAY, -- Random order purchase date within the last year NOW() - INTERVAL FLOOR(RAND() * 300) DAY, -- Random approved date within the last 300 days NOW() - INTERVAL FLOOR(RAND() * 200) DAY, -- Random carrier delivery date within the last 200 days NOW() - INTERVAL FLOOR(RAND() * 100) DAY, -- Random customer delivery date within the last 100 days NOW() + INTERVAL FLOOR(RAND() * 30) DAY -- Random estimated delivery date within the next 30 days ); SET i = i + 1; END WHILE; -- Re-enable foreign key checks SET FOREIGN_KEY_CHECKS = 1;END//DELIMITER ;
CALL InsertRandomOrders();

  select count(*) from ecom_orders eo;

  • 客户城市更新: 更新客户的城市名称,并在城市名称前添加“CITY_”

    Please execute the following script to update and add prefix in city_name in ecom_customer table:

DELIMITER //CREATE PROCEDURE UpdateCustomerCity()BEGIN    -- Disable autocommit    SET autocommit = 0;
-- Update customer_city by adding the prefix 'CITY_' for the specified customer_ids UPDATE ECommerceData.ecom_customers SET customer_city = CONCAT('CITY_', customer_city) WHERE customer_id IN ( '00012a2ce6f8dcda20d059ce98491703', '000161a058600d5901f007fab4c27140', '0001fd6190edaaf884bcaf3d49edf079', '0002414f95344307404f0ace7a26f1d5', '000379cdec625522490c315e70c7a9fb', '0004164d20a9e969af783496f3408652', '000419c5494106c306a97b5635748086', '00046a560d407e99b969756e0b10f282', '00050bf6e01e69d5c0fd612f1bcfb69c', '000598caf2ef4117407665ac33275130' );
-- Commit the transaction to save the changes COMMIT;
-- Re-enable autocommit SET autocommit = 1;END //DELIMITER ;
call UpdateCustomerCity()
  • Run the query below to observe updates in ecom_customers table

select * from ecom_customers eo where customer_id IN (        '00012a2ce6f8dcda20d059ce98491703',        '000161a058600d5901f007fab4c27140',        '0001fd6190edaaf884bcaf3d49edf079',        '0002414f95344307404f0ace7a26f1d5',        '000379cdec625522490c315e70c7a9fb',        '0004164d20a9e969af783496f3408652',        '000419c5494106c306a97b5635748086',        '00046a560d407e99b969756e0b10f282',        '00050bf6e01e69d5c0fd612f1bcfb69c',        '000598caf2ef4117407665ac33275130');

  • 订单明细的变化

    Price value is 21.9 in order_items table where order_id = '00048cc3ae777c65dbb7d2a0634bc1ea'

  • Update price and add 40 more in order_details table where price is less than 200

CREATE PROCEDURE UpdatePrices()BEGIN -- Update price by adding 40 where price is less than 200 UPDATE ECommerceData.ecom_order_items SET price = price + 40 WHERE price < 200;
-- Commit the changes COMMIT;END //DELIMITER ;
call UpdatePrices();

  Run below select query to see updated prices scripts works

select * from ECommerceData.ecom_order_items where order_id = '00048cc3ae777c65dbb7d2a0634bc1ea'

  • 观察视图 Order View 针对上面的变化 Observe the view that is updating

    Run this query to check the total number of orders in MongoDB, which should now be 99,451, as we added 10 new records. Before adding the records through the script, the total was 99,441.

  • Verify the updates in the customer_info document within the MongoDB wide collection.

db.orderSingleView.findOne({"customer_info.customer_id": "00012a2ce6f8dcda20d059ce98491703"})

  • Verify the updates in the order_items array within the MongoDB wide collection.

db.orderSingleView.findOne({  "order_items.order_id": '00048cc3ae777c65dbb7d2a0634bc1ea'});


TapFlow 是一个编程框架,目前还处于 Preview 状态。它允许您执行实时数据复制、数据处理以及创建物化视图等操作。它由一组 API、Python SDK 以及 Tap CLI(一个命令行实用程序)组成。和常见的实时数据管道或者集成方案(如 Kafka ETL)相比,使用 Tap Flow 的优势是:

  1. 内部直接集成了 CDC,不再需要额外的一个模块

  2. 基于 Python / JS 脚本语言,快速实现各种数据处理需求,12 行代码 vs Kafka 的接近 200 行

  3. 支持大部分主流国产数据库!

TapFlow 现已开放内测版本,获取方式指路>>>原文文末




Make Your Data on Tap 2021-04-23 加入

Tapdata 能够快速帮助企业快速打通数据孤岛,构建主数据服务平台,为业务提供统一、完整、实时的数据。现已上线永久免费的异构数据库同步工具cloud.tapdata.net ,支持主流数据库间的双向实时同步。


