TiDB+FLINK 实时计算
- 2022 年 7 月 11 日
本文字数:10270 字
阅读完需:约 34 分钟
作者: majiajue 原文来源:https://tidb.net/blog/8768bcd8
1. 参照官网配置 TiCDC 具体配置如下
# 指定配置文件中涉及的库名、表名是否为大小写敏感
# 该配置会同时影响 filter 和 sink 相关配置,默认为 true
case-sensitive = true
# 是否输出 old value,从 v4.0.5 开始支持
enable-old-value = true
[filter]
# 忽略指定 start_ts 的事务
ignore-txn-start-ts = [1, 2]
# 过滤器规则
# 过滤规则语法:https://docs.pingcap.com/zh/tidb/stable/table-filter#表库过滤语法 指定了我的销售表
rules = ['dspdev.sales_order_header']
[mounter]
# mounter 线程数,用于解码 TiKV 输出的数据
worker-num = 16
[sink]
# 对于 MQ 类的 Sink,可以通过 dispatchers 配置 event 分发器
# 支持 default、ts、rowid、table 四种分发器,分发规则如下:
# - default:有多个唯一索引(包括主键)时按照 table 模式分发;只有一个唯一索引(或主键)按照 rowid 模式分发;如果开启了 old value 特性,按照 table 分发
# - ts:以行变更的 commitTs 做 Hash 计算并进行 event 分发
# - rowid:以所选的 HandleKey 列名和列值做 Hash 计算并进行 event 分发
# - table:以表的 schema 名和 table 名做 Hash 计算并进行 event 分发
# matcher 的匹配语法和过滤器规则语法相同
dispatchers = [
{matcher = ['dspdev.*'], dispatcher = "ts"}
]
# 对于 MQ 类的 Sink,可以指定消息的协议格式
# 目前支持 default、canal、avro 和 maxwell 四种协议。default 为 TiCDC Open Protocol
protocol = "canal"
[cyclic-replication]
# 是否开启环形同步
enable = false
# 当前 TiCDC 的复制 ID
replica-id = 1
# 需要过滤掉的同步 ID
filter-replica-ids = [2,3]
# 是否同步 DDL
sync-ddl = true
2 cdc sink 配置下游为 kafka
--sink-uri="kafka://127.0.0.1:9092/cdc-test?kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
这样就会将 tidb cdc 数据以 protobuf 数据发完 kafka, 我们只需要在下游做解析就好
具体配置解释参考:tidb 配置连接
3 新建 spring boot 项目 引入 canal-client,kafka 等配置
pom 引入如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.konka.dsp</groupId>
<artifactId>kafka-parse</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-parse</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
<fastjson.version>1.2.70</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.springframework.cloud</groupId>-->
<!-- <artifactId>spring-cloud-starter</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.cloud</groupId>-->
<!-- <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
properties 如下:
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=192.168.8.71:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
?
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.alibaba.otter.canal.client.kafka.MessageDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
#过滤table和字段
table.data = {"sales_order_header":"id,customer_name,total_amount,created_date"}
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
sprint boot kafka 消费端代码如下:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.konka.dsp.kafkaparse.CanalKafkaClientExample;
import com.konka.dsp.kafkaparse.tidb.KafkaMessage;
import com.konka.dsp.kafkaparse.tidb.TicdcEventData;
import com.konka.dsp.kafkaparse.tidb.TicdcEventDecoder;
import com.konka.dsp.kafkaparse.tidb.TicdcEventFilter;
import com.konka.dsp.kafkaparse.tidb.value.TicdcEventDDL;
import com.konka.dsp.kafkaparse.tidb.value.TicdcEventResolve;
import com.konka.dsp.kafkaparse.tidb.value.TicdcEventRowChange;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@Component
public class kafkaConsumer {
protected final static Logger logger = LoggerFactory.getLogger(CanalKafkaClientExample.class);
// 消费监听
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("#{${table.data}}")
private Map<String,String> map;
@KafkaListener(topics = {"cdc-test"})
public void onMessage1(ConsumerRecord<String, Message> consumerRecord) throws UnsupportedEncodingException {
Message message = consumerRecord.value();
long batchId = message.getId();
FlatMessage fm = new FlatMessage();
List<CanalEntry.Entry> entrys = message.getEntries();
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
fm.setId(entry.getHeader().getExecuteTime());
fm.setDatabase(entry.getHeader().getSchemaName());
fm.setEs(entry.getHeader().getExecuteTime());
fm.setTs(entry.getHeader().getExecuteTime());
fm.setTable(entry.getHeader().getTableName());
fm.setType(rowChage.getEventType().name());
CanalEntry.EventType eventType = rowChage.getEventType();
fm.setIsDdl(rowChage.getIsDdl());
fm.setSql(rowChage.getSql());
Map<String,String> mysqlTypes = new HashMap<>();
Map<String,Integer> sqlType = new HashMap<>();
List<String> pkNames = new ArrayList<>();
logger.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
String[] filtercolumn = map.get(entry.getHeader().getTableName()).split(",");
logger.info(" filter --> column {}",filtercolumn);
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
fm.setData(saveRowData(rowData.getBeforeColumnsList(),pkNames,filtercolumn));
fm.setMysqlType(setMysqlTypes(rowData.getBeforeColumnsList(),filtercolumn));
fm.setSqlType(setSqlTypes(rowData.getBeforeColumnsList(),filtercolumn));
} else if (eventType == CanalEntry.EventType.INSERT) {
fm.setData(saveRowData(rowData.getAfterColumnsList(),pkNames,filtercolumn));
fm.setMysqlType(setMysqlTypes(rowData.getAfterColumnsList(),filtercolumn));
fm.setSqlType(setSqlTypes(rowData.getAfterColumnsList(),filtercolumn));
} else {
logger.info("-------> before->{}",rowData.getBeforeColumnsList().size());
fm.setOld(saveRowData(rowData.getBeforeColumnsList(),pkNames,filtercolumn));
logger.info("-------> after");
fm.setData(saveRowData(rowData.getAfterColumnsList(),pkNames,filtercolumn));
fm.setMysqlType(setMysqlTypes(rowData.getAfterColumnsList(),filtercolumn));
fm.setSqlType(setSqlTypes(rowData.getAfterColumnsList(),filtercolumn));
if(rowData.getBeforeColumnsList().size()==0&&rowData.getAfterColumnsList().size()>0){
fm.setType("INSERT");
}
}
}
HashSet h = new HashSet(pkNames);
pkNames.clear();
pkNames.addAll(h);
fm.setPkNames(pkNames);
}
logger.info("json解析:{}",JSON.toJSONString(fm, SerializerFeature.WriteMapNullValue));
kafkaTemplate.send("canal-data",JSON.toJSONString(fm, SerializerFeature.WriteMapNullValue));
//
// FlatMessage flatMessage = (FlatMessage)JSON.parseObject(flatMessageJson, FlatMessage.class);
// 消费的哪个topic、partition的消息,打印出消息内容
// KafkaMessage kafkaMessage = new KafkaMessage();
// kafkaMessage.setKey(consumerRecord.key());
// kafkaMessage.setValue(consumerRecord.value());
// kafkaMessage.setOffset(consumerRecord.offset());
// kafkaMessage.setPartition(consumerRecord.partition());
// kafkaMessage.setTimestamp(consumerRecord.timestamp());
// TicdcEventFilter filter = new TicdcEventFilter();
// TicdcEventDecoder ticdcEventDecoder = new TicdcEventDecoder(kafkaMessage);
// while (ticdcEventDecoder.hasNext()) {
// TicdcEventData data = ticdcEventDecoder.next();
// if (data.getTicdcEventValue() instanceof TicdcEventRowChange) {
// boolean ok = filter.check(data.getTicdcEventKey().getTbl(), data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs());
// if (ok) {
// // deal with row change event
// } else {
// // ignore duplicated messages
// }
// } else if (data.getTicdcEventValue() instanceof TicdcEventDDL) {
// // deal with ddl event
// } else if (data.getTicdcEventValue() instanceof TicdcEventResolve) {
// filter.resolveEvent(data.getTicdcEventValue().getKafkaPartition(), data.getTicdcEventKey().getTs());
// // deal with resolve event
// }
// System.out.println(JSON.toJSONString(data, true));
// }
}
private List<Map<String,String>> saveRowData(List<CanalEntry.Column> columns,List<String> pkNames,String[] filter) {
Map map = new HashMap<>();
List<Map<String,String>> rowdata = new ArrayList<>();
columns.forEach(column -> {
if(column.hasIsKey()){
pkNames.add(column.getName());
}
if(Arrays.asList(filter).contains(column.getName())){
map.put(column.getName(),column.getValue().equals("")?"NULL":column.getValue());
}
//防止flink接收""报错
});
rowdata.add(map);
return rowdata;
// rabbitTemplate.convertAndSend(tableEventType.toUpperCase(),JSON.toJSONString(map));
}
private Map<String,String> setMysqlTypes(List<CanalEntry.Column> columns,String[] filter){
Map<String,String> map = new HashMap<>();
columns.forEach(column -> {
if(Arrays.asList(filter).contains(column.getName())){
map.put(column.getName(),column.getMysqlType());
}
});
return map;
}
private Map<String,Integer> setSqlTypes(List<CanalEntry.Column> columns,String[] filter){
Map<String,Integer> map = new HashMap<>();
columns.forEach(column -> {
if(Arrays.asList(filter).contains(column.getName())){
map.put(column.getName(),column.getSqlType());
}
});
return map;
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
这里基本上将 tidb 的数据转化为 canal-json 格式数据,这里我们继续将转化后的数据发完 kafka, 以便 kafka 继续消费,这里有个点就是不知道为什么 tidb 出来的 insert 和 update eventtype 类型都是 UPDATE,所以我在代码做了判断没有 OLD 的话基本上就是 INSERT 了
4.flink 本地开发 建议下载搭建好环境参考 flink table 配置
把 table 相关 jar 包拷贝到 flink 下的 lib 目录下即可
这里的会用到另外一个知乎开源的相关包项目地址如下: https://github.com/pingcap-incubator/TiBigData/
把项目编译完成以后把 flink 相关 jar 包拷贝到 flink 下的 lib 下
5 最后在我们的相关业务库配置表这里我上代码了:
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.*;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;
public class SalesOrderStream {
public static Table report(Table transactions) {
return transactions.select(
$("customer_name"),
$("created_date"),
$("total_amount"))
.groupBy($("customer_name"), $("created_date"))
.select(
$("customer_name"),
$("total_amount").sum().as("total_amount"),
$("created_date")
);
}
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
// tEnv.executeSql("CREATE TABLE sales_order_header_stream (\"n" +
//// " id BIGINT not null,\"n" +
// " customer_name STRING,\"n"+
//// " dsp_org_name STRING,\"n"+
// " total_amount DECIMAL(38,2),\"n" +
//// " total_discount DECIMAL(16,2),\"n" +
//// " pay_amount DECIMAL(16,2),\"n" +
//// " total_amount DECIMAL(16,2),\"n" +
// " created_date TIMESTAMP(3)\"n" +
// ") WITH (\"n" +
// " 'connector' = 'mysql-cdc',\"n" +
// " 'hostname' = '192.168.8.73',\"n" +
// " 'port' = '4000',\"n"+
// " 'username' = 'flink',\"n"+
// " 'password' = 'flink',\"n"+
// " 'database-name' = 'dspdev',\"n"+
// " 'table-name' = 'sales_order_header'\"n"+
// ")");
tEnv.executeSql("CREATE TABLE sales_order_header_stream (\"n" +
" `id` BIGINT,\"n"+
" `total_amount` DECIMAL(16,2) ,\"n"+
" `customer_name` STRING,\"n"+
" `created_date` TIMESTAMP(3) ,\"n"+
" PRIMARY KEY (`id`) NOT ENFORCED "+
") WITH (\"n" +
"'connector' = 'kafka',\"n"+
"'topic' = 'canal-data',\"n"+
"'properties.bootstrap.servers' = '192.168.8.71:9092',\"n"+
"'properties.group.id' = 'test',\"n"+
"'scan.startup.mode' = 'earliest-offset',\"n"+
"'format' = 'canal-json'\"n"+
")");
tEnv.executeSql("CREATE TABLE spend_report (\"n" +
" customer_name STRING,\"n" +
// " total_amount DECIMAL(16,2),\"n" +
// " total_discount DECIMAL(16,2),\"n" +
// " pay_amount DECIMAL(16,2),\"n" +
" total_amount DECIMAL(16,2),\"n" +
" created_date TIMESTAMP(3),\"n" +
" PRIMARY KEY (customer_name,created_date) NOT ENFORCED" +
") WITH (\"n" +
" 'connector' = 'tidb',\"n" +
" 'tidb.database.url' = 'jdbc:mysql://192.168.8.73:4000/dspdev',\"n" +
" 'tidb.username' = 'flink',\"n"+
" 'tidb.password' = 'flink',\"n"+
" 'tidb.database.name' = 'dspdev',\"n"+
" 'tidb.table.name' = 'spend_report'\"n"+
")");
Table transactions = tEnv.from("sales_order_header_stream");
report(transactions).executeInsert("spend_report");
}
}
这样在我数据库里面就可以实时统计当前的销售总价并写入数据库里,最后数据库数据如下:
以上就是机遇 TIDB 体系进行的数据实时统计
版权声明: 本文为 InfoQ 作者【TiDB 社区干货传送门】的原创文章。
原文链接:【http://xie.infoq.cn/article/ee59f048369d6ae5739410f18】。文章转载请联系作者。
TiDB 社区干货传送门
TiDB 社区官网:https://tidb.net/ 2021.12.15 加入
TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/
评论