写点什么

万字带你深入阿里开源的 Canal 工作原理

发布于: 2021 年 01 月 20 日
万字带你深入阿里开源的Canal工作原理

前言

上篇文章给大家讲解了如何安装一个 Canal,以及讲解了一部分的原理,今天我们就来深度聊一聊 Canal 的工作流程,以及他是怎么工作的,以及架构师怎样的。

首先我们深度了解 Canal 时必须深度了解了一下 MySQL 主从复制原理。


一、MySQL 主从复制

MySQL 主备复制原理


  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件 log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据,以此来达到数据一致。


MySQL 的 binLog

         它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。binlog 有三种: STATEMENTROWMIXED


  • STATEMENT 记录的是执行的 sql 语句

  • ROW 记录的是真实的行数据记录

  • MIXED 记录的是 1+2,优先按照 1 的模式记录


名词解释

什么是中继日志


从服务器 I/O 线程将主服务器的二进制日志读取过来记录到从服务器本地文件,然后从服务器 SQL 线程会读取 relay-log 日志的内容并应用到从服务器,从而使从服务器和主服务器的数据保持一致


二、Canal 架构

  • server 代表一个 canal 运行实例,对应于一个 jvm

  • instance 对应于一个数据队列 (1 个 canal server 对应 1..n 个 instance )

  • instance 下的子模块

- eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析

- eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作

- eventStore: 数据存储

- metaManager: 增量订阅 & 消费信息管理器


EventParser 在向 MySQL 发送 dump 命令之前会先从 Log Position 中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段 binlog 位点)。mysql 接受到 dump 命令后,由 EventParser 从 mysql 上 pull binlog 数据进行解析并传递给 EventSink(传递给 EventSink 模块进行数据存储,是一个阻塞操作,直到存储成功 ),传送成功之后更新 Log Position。流程图如下:

  • EventSink 起到一个类似 channel 的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink 是连接 EventParser 和 EventStore 的桥梁。

  • EventStore 实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get 和 Ack)标识数据存储和读取的位置。

  • MetaManager 是增量订阅 &消费信息管理器,增量订阅和消费之间的协议包括 get/ack/rollback,分别为:

- Message getWithoutAck(int batchSize),允许指定 batchSize,一次可以获取多条,每次返回的对象为 Message,包含的内容为:batch id[唯一标识]和 entries[具体的数据对象]

- void rollback(long batchId),顾名思义,回滚上次的 get 请求,重新获取数据。基于 get 获取的 batchId 进行提交,避免误操作

- void ack(long batchId),顾名思议,确认已经消费成功,通知 server 删除数据。基于 get 获取的 batchId 进行提交,避免误操作


三、server/client 交互协议

canal client与 canal server 之间是 C/S 模式的通信,客户端采用 NIO,服务端采用 Netty。canal server 启动后,如果没有 canal client,那么 canal server 不会去 mysql 拉取 binlog。

即 Canal 客户端主动发起拉取请求,服务端才会模拟一个 MySQL Slave 节点去主节点拉取 binlog。

通常 Canal 客户端是一个死循环,这样客户端一直调用 get 方法,服务端也就会一直拉取 binlog


BIO、NIO、AIO 的区别


IO 的方式通常分为几种,同步阻塞的BIO同步非阻塞的NIO异步非阻塞的AIO


同步阻塞 IO:在此种方式下,用户进程在发起一个 IO 操作以后,必须等待 IO 操作的完成,只有当真正完成了 IO 操作以后,用户进程才能运行。JAVA 传统的 IO 模型属于此种方式!

同步非阻塞 IO:在此种方式下,用户进程发起一个 IO 操作以后边可返回做其它事情,但是用户进程需要时不时的询问 IO 操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的 CPU 资源浪费。其中目前 JAVA 的 NIO 就属于同步非阻塞 IO。


异步阻塞 IO:此种方式下是指应用发起一个 IO 操作以后,不等待内核 IO 操作的完成,等内核完成 IO 操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问 IO 是否完成,那么为什么说是阻塞的呢?因为此时是通过 select 系统调用来完成的,而 select 函数本身的实现方式是阻塞的,而采用 select 函数有个好处就是它可以同时监听多个文件句柄,从而提高系统的并发性!


异步非阻塞 IO:在此种模式下,用户进程只需要发起一个 IO 操作然后立即返回,等 IO 操作真正的完成以后,应用程序会得到 IO 操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的 IO 读写操作,因为真正的 IO 读取或者写入操作已经由内核完成了。目前 Java 中还没有支持此种 IO 模型。


         canal client 与 canal server 之间属于增量订阅/消费,流程图如下:(其中 C 端是 canal client,S 端是 canal server)

canal client 调用connect()方法时,发送的数据包(PacketType)类型为:


  1. handshake

  2. ClientAuthentication


canal client 调用subscribe()方法,类型为[subscription]。

对应服务端采用 netty 处理 RPC 请求(CanalServerWithNetty):


public class CanalServerWithNetty extends AbstractCanalLifeCycle implements CanalServer {    public void start() {        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {            public ChannelPipeline getPipeline() throws Exception {                ChannelPipeline pipelines = Channels.pipeline();                pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());                // 处理客户端的HANDSHAKE请求                pipelines.addLast(HandshakeInitializationHandler.class.getName(),                    new HandshakeInitializationHandler(childGroups));                // 处理客户端的CLIENTAUTHENTICATION请求                pipelines.addLast(ClientAuthenticationHandler.class.getName(),                    new ClientAuthenticationHandler(embeddedServer));
// 处理客户端的会话请求,包括SUBSCRIPTION,GET等 SessionHandler sessionHandler = new SessionHandler(embeddedServer); pipelines.addLast(SessionHandler.class.getName(), sessionHandler); return pipelines; } }); }}
复制代码

ClientAuthenticationHandler 处理鉴权后,会移除 HandshakeInitializationHandler 和ClientAuthenticationHandler

最重要的是会话处理器SessionHandler


以 client 发送 GET,server 从 mysql 得到 binlog 后,返回 MESSAGES 给 client 为例,说明 client 和 server 的 rpc 交互过程:


SimpleCanalConnector 发送GET请求,并读取响应结果的流程:

public Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit) throws CanalClientException {    waitClientRunning();    int size = (batchSize <= 0) ? 1000 : batchSize;    long time = (timeout == null || timeout < 0) ? -1 : timeout; // -1代表不做timeout控制    if (unit == null) unit = TimeUnit.MILLISECONDS;  //默认是毫秒
// client发送GET请求 writeWithHeader(Packet.newBuilder() .setType(PacketType.GET) .setBody(Get.newBuilder() .setAutoAck(false) .setDestination(clientIdentity.getDestination()) .setClientId(String.valueOf(clientIdentity.getClientId())) .setFetchSize(size) .setTimeout(time) .setUnit(unit.ordinal()) .build() .toByteString()) .build() .toByteArray()); // client获取GET结果 return receiveMessages();}
private Message receiveMessages() throws IOException { // 读取server发送的数据包 Packet p = Packet.parseFrom(readNextPacket()); switch (p.getType()) { case MESSAGES: { Messages messages = Messages.parseFrom(p.getBody()); Message result = new Message(messages.getBatchId()); for (ByteString byteString : messages.getMessagesList()) { result.addEntry(Entry.parseFrom(byteString)); } return result; } }}
复制代码

服务端 SessionHandler 处理客户端发送的GET请求流程:

case GET:    // 读取客户端发送的数据包,封装为Get对象    Get get = CanalPacket.Get.parseFrom(packet.getBody());    // destination表示canal instance    if (StringUtils.isNotEmpty(get.getDestination()) && StringUtils.isNotEmpty(get.getClientId())) {        clientIdentity = new ClientIdentity(get.getDestination(), Short.valueOf(get.getClientId()));        Message message = null;        if (get.getTimeout() == -1) {// 是否是初始值            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());        } else {            TimeUnit unit = convertTimeUnit(get.getUnit());            message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);        }        // 设置返回给客户端的数据包类型为MESSAGES           Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();        packetBuilder.setType(PacketType.MESSAGES);        // 构造Message        Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();        messageBuilder.setBatchId(message.getId());        if (message.getId() != -1 && !CollectionUtils.isEmpty(message.getEntries())) {            for (Entry entry : message.getEntries()) {                messageBuilder.addMessages(entry.toByteString());            }        }        packetBuilder.setBody(messageBuilder.build().toByteString());        // 输出数据,返回给客户端        NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);    }
复制代码

具体的网络协议格式,可参见:CanalProtocol.proto


get/ack/rollback 协议介绍:


  • Message getWithoutAck(int batchSize)

- 允许指定 batchSize,一次可以获取多条,每次返回的对象为 Message,包含的内容为:

- batch id 唯一标识

- entries 具体的数据对象,对应的数据对象格式:EntryProtocol.proto

  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit)

- 相比于 getWithoutAck(int batchSize),允许设定获取数据的 timeout 超时时间

- 拿够 batchSize 条记录或者超过 timeout 时间

- timeout=0,阻塞等到足够的 batchSize

  • void rollback(long batchId)

- 回滚上次的 get 请求,重新获取数据。基于 get 获取的 batchId 进行提交,避免误操作

  • void ack(long batchId)

- 确认已经消费成功,通知 server 删除数据。基于 get 获取的 batchId 进行提交,避免误操作


EntryProtocol.protod 对应的 canal 消息结构如下:

Entry      Header          logfileName [binlog文件名]          logfileOffset [binlog position]          executeTime [binlog里记录变更发生的时间戳,精确到秒]          schemaName           tableName          eventType [insert/update/delete类型]      entryType   [事务头BEGIN/事务尾END/数据ROWDATA]      storeValue  [byte数据,可展开,对应的类型为RowChange]        RowChange      isDdl       [是否是ddl变更操作,比如create table/drop table]      sql         [具体的ddl sql]      rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]          beforeColumns [Column类型的数组,变更前的数据字段]          afterColumns [Column类型的数组,变更后的数据字段]            Column       index             sqlType     [jdbc type]      name        [column name]      isKey       [是否为主键]      updated     [是否发生过变更]      isNull      [值是否为null]      value       [具体的内容,注意为string文本]
复制代码

SessionHandler 中服务端处理客户端的其他类型请求,都会调用CanalServerWithEmbedded的相关方法:

case SUBSCRIPTION:        Sub sub = Sub.parseFrom(packet.getBody());        embeddedServer.subscribe(clientIdentity);case GET:        Get get = CanalPacket.Get.parseFrom(packet.getBody());        message = embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());case CLIENTACK:        ClientAck ack = CanalPacket.ClientAck.parseFrom(packet.getBody());        embeddedServer.ack(clientIdentity, ack.getBatchId());case CLIENTROLLBACK:        ClientRollback rollback = CanalPacket.ClientRollback.parseFrom(packet.getBody());        embeddedServer.rollback(clientIdentity);// 回滚所有批次
复制代码

所以真正的处理逻辑在CanalServerWithEmbedded中,下面重点来了。。。


3.1 CanalServerWithEmbedded

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; CanalServer 包含多个 Instance,它的成员变量canalInstances记录了 instance 名称与实例的映射关系。

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 因为是一个 Map,所以同一个 Server 不允许出现相同 instance 名称(本例中实例名称为 example),比如不能同时有两个 example 在一个 server 上。但是允许一个 Server 上有 example1 和 example2。


注意:CanalServer中最重要的是CanalServerWithEmbedded,而 CanalServerWithEmbedded 中最重要的是CanalInstance


&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 下图表示一个 server 配置了两个 Canal 实例(instance),每个 Client 连接一个 Instance。

每个 Canal 实例模拟为一个 MySQL 的 slave,所以每个 Instance 的 slaveId 必须不一样。

比如图中两个 Instance 的 id 分别是 1234 和 1235,它们都会拉取 MySQL 主节点的 binlog。

这里每个 Canal Client 都对应一个 Instance,每个 Client 在启动时,

都会指定一个 Destination,这个 Destination 就表示 Instance 的名称。

所以 CanalServerWithEmbedded 处理各种请求时的参数都有 ClientIdentity,

从 ClientIdentity 中获取 destination,就可以获取出对应的 CanalInstance。


理解下各个组件的对应关系:

  • Canal Client 通过 destination 找出 Canal Server 中对应的 Canal Instance。

  • 一个 Canal Server 可以配置多个 Canal Instances。


下面以 CanalServerWithEmbedded 的订阅方法为例:


  1. 根据客户端标识获取 CanalInstance

  2. 向 CanalInstance 的元数据管理器订阅当前客户端

  3. 从元数据管理中获取客户端的游标

  4. 通知 CanalInstance 订阅关系发生变化


注意:提供订阅方法的作用是:MySQL 新增了一张表,客户端原先没有同步这张表,现在需要同步,所以需要重新订阅。

public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
// ClientIdentity表示Canal Client客户端,从中可以获取出客户端指定连接的Destination
// 由于CanalServerWithEmbedded记录了每个Destination对应的Instance,可以获取客户端对应的Instance
CanalInstance canalInstance = canalInstances.get(clientIdentity.getDestination());
if (!canalInstance.getMetaManager().isStart()) {
canalInstance.getMetaManager().start(); // 启动Instance的元数据管理器
}
canalInstance.getMetaManager().subscribe(clientIdentity); // 执行一下meta订阅
Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
if (position == null) {
position = canalInstance.getEventStore().getFirstPosition();// 获取一下store中的第一条
if (position != null) {
canalInstance.getMetaManager().updateCursor(clientIdentity, position); // 更新一下cursor
}
}
// 通知下订阅关系变化
canalInstance.subscribeChange(clientIdentity);
}
复制代码


每个CanalInstance中包括了四个组件:**EventParser、EventSink、EventStore、MetaManager**。
服务端主要的处理方法包括get/ack/rollback,这三个方法都会用到Instance上面的几个内部组件,主要还是EventStore和MetaManager:
在这之前,要先理解EventStore的含义,EventStore是一个RingBuffer,有三个指针:**Put、Get、Ack**。
- Put: Canal Server从MySQL拉取到数据后,放到内存中,Put增加- Get: 消费者(Canal Client)从内存中消费数据,Get增加- Ack: 消费者消费完成,Ack增加。并且会删除Put中已经被Ack的数据
这三个操作与Instance组件的关系如下:![](https://img-blog.csdnimg.cn/20210120154723227.png? )客户端通过canal server获取mysql binlog有几种方式(get方法和getWithoutAck):
- 如果timeout为null,则采用tryGet方式,即时获取- 如果timeout不为null 1. timeout为0,则采用get阻塞方式,获取数据,不设置超时,直到有足够的batchSize数据才返回 2. timeout不为0,则采用get+timeout方式,获取数据,超时还没有batchSize足够的数据,有多少返回多少```javaprivate Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout, TimeUnit unit) { if (timeout == null) { return eventStore.tryGet(start, batchSize); // 即时获取 } else if (timeout <= 0){ return eventStore.get(start, batchSize); // 阻塞获取 } else { return eventStore.get(start, batchSize, timeout, unit); // 异步获取 }}
复制代码

注意:EventStore 的实现采用了类似 Disruptor 的 RingBuffer 环形缓冲区。RingBuffer 的实现类是 MemoryEventStoreWithBuffer


get 方法和 getWithoutAck 方法的区别是:

  • get 方法会立即调用 ack

  • getWithoutAck 方法不会调用 ack

3.2 *EventStore*


以 10 条数据为例,初始时 current=-1,第一个元素起始 next=0,end=9,循环[0,9]所有元素。

List 元素为(A,B,C,D,E,F,G,H,I,J)

第一批 10 个元素 put 完成后,putSequence 设置为 end=9。假设第二批又 Put 了 5 个元素:(K,L,M,N,O)


current=9,起始 next=9+1=10,end=9+5=14,在 Put 完成后,putSequence 设置为 end=14。

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 这里假设环形缓冲区的最大大小为 15 个(源码中是 16MB),那么上面两批一共产生了 15 个元素,刚好填满了环形缓冲区。

如果又有 Put 事件进来,由于环形缓冲区已经满了,没有可用的 slot,则 Put 操作会被阻塞,直到被消费掉。


下面是 Put 填充环形缓冲区的代码,检查可用 slot(checkFreeSlotAt 方法)在几个 put 方法中。


public class MemoryEventStoreWithBuffer extends AbstractCanalStoreScavenge implements CanalEventStore<Event>, CanalStoreScavenge {    private static final long INIT_SQEUENCE = -1;    private int               bufferSize    = 16 * 1024;    private int               bufferMemUnit = 1024;                         // memsize的单位,默认为1kb大小    private int               indexMask;    private Event[]           entries;
// 记录下put/get/ack操作的三个下标 private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前put操作最后一次写操作发生的位置 private AtomicLong getSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前get操作读取的最后一条的位置 private AtomicLong ackSequence = new AtomicLong(INIT_SQEUENCE); // 代表当前ack操作的最后一条的位置
// 启动EventStore时,创建指定大小的缓冲区,Event数组的大小是16*1024 // 也就是说算个数的话,数组可以容纳16000个事件。算内存的话,大小为16MB public void start() throws CanalStoreException { super.start(); indexMask = bufferSize - 1; entries = new Event[bufferSize]; }
// EventParser解析后,会放入内存中(Event数组,缓冲区) private void doPut(List<Event> data) { long current = putSequence.get(); // 取得当前的位置,初始时为-1,第一个元素为-1+1=0 long end = current + data.size(); // 最末尾的位置,假设Put了10条数据,end=-1+10=9 // 先写数据,再更新对应的cursor,并发度高的情况,putSequence会被get请求可见,拿出了ringbuffer中的老的Entry值 for (long next = current + 1; next <= end; next++) { entries[getIndex(next)] = data.get((int) (next - current - 1)); } putSequence.set(end); } }
复制代码

Put 是生产数据,Get 是消费数据,Get 一定不会超过 Put。比如 Put 了 10 条数据,Get 最多只能获取到 10 条数据。但有时候为了保证 Get 处理的速度,Put 和 Get 并不会相等。

可以把 Put 看做是生产者,Get 看做是消费者。生产者速度可以很快,消费者则可以慢慢地消费。比如 Put 了 1000 条,而 Get 我们只需要每次处理 10 条数据。


仍然以前面的示例来说明 Get 的流程,初始时 current=-1,假设 Put 了两批数据一共 15 条,maxAbleSequence=14,而 Get 的 BatchSize 假设为 10。

初始时 next=current=-1,end=-1。通过 startPosition,会设置 next=0。最后 end 又被赋值为 9,即循环缓冲区[0,9]一共 10 个元素。


private Events<Event> doGet(Position start, int batchSize) throws CanalStoreException {    LogPosition startPosition = (LogPosition) start;
long current = getSequence.get(); long maxAbleSequence = putSequence.get(); long next = current; long end = current; // 如果startPosition为null,说明是第一次,默认+1处理 if (startPosition == null || !startPosition.getPostion().isIncluded()) { // 第一次订阅之后,需要包含一下start位置,防止丢失第一条记录 next = next + 1; }
end = (next + batchSize - 1) < maxAbleSequence ? (next + batchSize - 1) : maxAbleSequence; // 提取数据并返回 for (; next <= end; next++) { Event event = entries[getIndex(next)]; if (ddlIsolation && isDdl(event.getEntry().getHeader().getEventType())) { // 如果是ddl隔离,直接返回 if (entrys.size() == 0) { entrys.add(event);// 如果没有DML事件,加入当前的DDL事件 end = next; // 更新end为当前 } else { // 如果之前已经有DML事件,直接返回了,因为不包含当前next这记录,需要回退一个位置 end = next - 1; // next-1一定大于current,不需要判断 } break; } else { entrys.add(event); } } // 处理PositionRange,然后设置getSequence为end getSequence.compareAndSet(current, end)}
复制代码

ack 操作的上限是 Get,假设 Put 了 15 条数据,Get 了 10 条数据,最多也只能 Ack10 条数据。Ack 的目的是清空缓冲区中已经被 Get 过的数据

public void ack(Position position) throws CanalStoreException {    cleanUntil(position);}
public void cleanUntil(Position position) throws CanalStoreException { long sequence = ackSequence.get(); long maxSequence = getSequence.get();
boolean hasMatch = false; long memsize = 0; for (long next = sequence + 1; next <= maxSequence; next++) { Event event = entries[getIndex(next)]; memsize += calculateSize(event); boolean match = CanalEventUtils.checkPosition(event, (LogPosition) position); if (match) {// 找到对应的position,更新ack seq hasMatch = true;
if (batchMode.isMemSize()) { ackMemSize.addAndGet(memsize); // 尝试清空buffer中的内存,将ack之前的内存全部释放掉 for (long index = sequence + 1; index < next; index++) { entries[getIndex(index)] = null;// 设置为null } }
ackSequence.compareAndSet(sequence, next) } }}
复制代码

rollback 回滚方法的实现则比较简单,将 getSequence 回退到 ack 位置。

public void rollback() throws CanalStoreException {    getSequence.set(ackSequence.get());    getMemSize.set(ackMemSize.get());}
复制代码

下图展示了 RingBuffer 的几个操作示例:

3.3 EventParser WorkFlow

EventStore 负责存储解析后的 Binlog 事件,而解析动作负责拉取 Binlog,它的流程比较复杂。需要和 MetaManager 进行交互。

比如要记录每次拉取的 Position,这样下一次就可以从上一次的最后一个位置继续拉取。所以 MetaManager 应该是有状态的。


EventParser 的流程如下:


  1. Connection 获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的 binlog 位点)

  2. Connection 建立链接,发送 BINLOG_DUMP 指令

  3. Mysql 开始推送 Binaly Log

  4. 接收到的 Binaly Log 的通过 Binlog parser 进行协议解析,补充一些特定信息

  5. 传递给 EventSink 模块进行数据存储,是一个阻塞操作,直到存储成功

  6. 存储成功后,定时记录 Binaly Log 位置

总结

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 上述我们讲了一些架构和一些交互模式,和比较多原理,做为一名优秀的程序员不能只单纯的会使用,而是多去了解他的思想和为什么这么写,这样你的代码能力才一天比一天强。我在这里为大家提供大数据的资源需要的朋友可以去下面 GitHub 去下载,信自己,努力和汗水总会能得到回报的。我是大数据老哥,我们下期见~~~

>资源获取 获取 Flink 面试题,Spark 面试题,程序员必备软件,hive 面试题,Hadoop 面试题,Docker 面试题,简历模板等资源请去

>GitHub 自行下载 https://github.com/lhh2002/Framework-Of-BigData

>Gitee 自行下载 https://gitee.com/liheyhey/dashboard/projects

>实时数仓代码 GitHub: https://github.com/lhh2002/RealTimeData_WareHouse


用户头像

微信搜公众号【大数据老哥】 2021.01.03 加入

微信搜索公众号【大数据老哥】 自己GitHub【https://github.com/lhh2002】 欢迎来star

评论

发布
暂无评论
万字带你深入阿里开源的Canal工作原理