写点什么

架构实战 7 - 消息队列 MySql 表格设计

作者:
  • 2023-03-12
    广东
  • 本文字数:2902 字

    阅读完需:约 10 分钟

目的

设计消息队列存储消息数据的 MySQL 表格

表格设计

概念

  • Producer:向主题发布新消息的应用程序。

  • Consumer:从主题订阅新消息的应用程序。

  • ConsumerGroup:消费者组,多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐量。

  • Broker:缓存代理,消息队列集群中的一台或多台服务器统称 broker。

  • Topic:消息队列处理资源的消息源(feeds of messages)的不同分类,主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。

  • Partition(Queue):Topic 物理上的分组,一个 topic 可以分为多个 partion(queue),每个 partion(queue)是一个有序的队列。partion(queue)中每条消息都会被分配一个 有序的 Id(offset)。

  • Message:消息,是通信的基本单位,每个 producer 可以向一个 topic(主题)发布一些消息。

  • Offset:消息位移,表示分区中每条消息的位置信息,是一个单调递增且不变的值。

  • Consumer Offset:消费者位移,表征消费者消费进度,每个消费者都有自己的消费者位移。

  • Rebalance:重平衡,消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

生产消费流程

Kafka


  1. 生产者程序通常持续不断地向一个或多个 Topic 主题发送消息。

  2. 和生产者类似,消费者也能够同时订阅多个 Topic 主题的消息。

  3. 可以同时运行多个生产者和消费者实例,这些实例会不断地向 消息队列集群中的多个主题生产和消费消息。

  4. Broker 即服务器端,负责接收和处理客户端发送过来的请求,以及对消息进行持久化(此处利用 MySql 进行存储)。

  5. 虽然多个 Broker 进程能够运行在同一台机器上,但更常见的做法是将不同的 Broker 分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有 Broker 进程都挂掉了,其他机器上的 Broker 也依然能够对外提供服务。

  6. 同时,倘若一个 Topic 积累了太多的数据以至于单台 Broker 机器都无法容纳了,此时应该怎么办呢?一个很自然的想法就是,能否把数据分割成多份保存在不同的 Broker 上,这种机制就是所谓的分区(Partitioning), MongoDB 和 Elasticsearch 中的 Sharding、HBase 中的 Region,其实它们都是相同的原理,只是 Partitioning 是最标准的名称。

  7. 可以让一个 Consumer 实例负责一个 Queue,这样消息处理既清晰又高效,如果后面 Consumer 增加或者减少,Queue 增加或者减少就需要重新分配了,这就是消息队列中经常提到的重平衡的一个概念。

存储分析

分区

我们对某个 Topic 进行消息发送,如果要进行持久化的话,很自然的能想到要以某个字段来分表就行,此处用 Topic 来进行分表,每个 Topic 写入的消息数据放在同一张表里面。


但是在实际的业务中,必定是有的 Topic 消息量会很大,有的会很少。对于消息量很大的,可能很短时间内就积累了很大的数据量,那么这个表必定越来越大,很快就会过负载。所以我们在设计数据存储的时候一定要考虑到如何将某个 Topic 下的数据均匀的存储到多张表里面去


RocketMQ


Kafka


此处我们可以借鉴一下开源的 MQ 是怎么设计的,以 RocketMQ 来说,一个 Topic 下的消息会存储到多个 Queue 中(也就相当于 Kafka 中的 Partition),也就是每个 Topic 下的 Queue 并不是一定的数量,而是可以在创建 Topic 时进行指定,所以我们在用表做存储的时候,也可以参考这个设计,本来每个 Topic 对应的消息量就不同,所以一张表也就相当于一个 Queue(Partition),用于存储消息内容


同时,我们可以记录 Topic 和 Queue 之间的关系,记录后有如下的作用:


  • 往某个 Topic 写入消息的时候,就知道这个 Topic 对应哪些 Queue(数据表),可以轮询的写入这些表,这样数据就比较平均。

  • 创建 Topic 的时候要预估一下数据量,然后指定分配多少个 Queue 用于存储。(后续也能修改 Queue 的数量,方便扩展)

  • 如果 Queue 分布在多个 Broker 的话,Consumer 的多个实例就可以连接不同的 Broker,大大提升了消息处理能力。


消费者组(以 Kafka 为例)

Consumer Group 是 Kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调再一起来消费订阅主题(Subscribed Topics)的所有分区(Partition or Queue)。当然,每个分区只能由同一个消费者组内的一个 Consumer 实例来消费。消费者组有三个特性比较重要:


  1. Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。

  2. Group ID 在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。

  3. Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。


数据库表设计


Topic 表


Topic (TABLE)

id BIGINT(64) PK

topic_name VARCHAR(255)

broker_id BINARY(16)

create_time DATETIME(19)

update_time DATETIME(19)


  • 索引为 id、create_time、broker_id

  • 其中 broker_id 方便快速定位该 Topic 分布在哪些 Broker 上,提升效率。

Broker 表


Broker (TABLE)

id BIGINT(64) PK

broker_name VARCHAR(255)

create_time DATETIME(19)

update_time DATETIME(19)


  • 索引为 id、create_time

Queue 表


Queue (TABLE)

id BIGINT(64) PK

topic_id BIGINT(64) //该队列归属于哪个 Topic 下面

offset BIGINT(64) //消息位移,表示分区中每条消息的位置信息,是一个单调递增且不变的值。

create_time DATETIME(19)

update_time DATETIME(19)


  • 索引为 id、create_time、topic_id、offset

  • topic_id 可以很方便的用来统计某个 Topic 下面有哪些队列(分区),轮询写入时,可以提升效率

  • offset 方便插入新消息,同时,方便消费者消费时去定位到本次应该消费的位置。


Message 表


Message (Table)

id BIGINT(64) PK // Queue 表中的 offset

messages JSON //消息的具体内容

create_time DATETIME(19)

update_time DATETIME(19)


  • 索引为 id、create_time

  • 其中 id 方便用来定位查找具体的消息内容


Consumer 表


Consumer (TABLE)

id BIGINT(64) PK

group_id BIGINT(64)

name VARCHAR(255)

create_time DATETIME(19)

update_time DATETIME(19)


  • 索引为 id、create_time、group_id

  • 其中 group_id 方便用来定位该消费者属于哪个群组


ConsumerGroup 表


ConsumerGroup(TABLE)

id BIGINT(64) PK

leader_consumer_id BIGINT(64) //消费组中 leader consumer 的 id

name VARCHAR(255)

count INT(64) //消费组中消费者的数量

create_time DATETIME(19)

update_time DATETIME(19)


  • 索引为 id、create_time

ConsumerRecord 表


ConsumerRecord(TABLE)

id BIGINT(64) PK

consumer_id BIGINT(64) //该消费记录归属哪个消费者

queue_id BIGINT(64) //该消费记录消费了某个 topic 某个 queue 的消息

consumer_offset //消费者位移,表征消费者消费进度,每个消费者都有自己的消费者位移

create_time DATETIME(19)

update_time DATETIME(19)


  • 索引为 id、create_time、consumer_id、queue_id

  • 其中 consumer_id 方便快速筛选出某(些)消费者消费的记录

  • queue_id 方便快速筛选出消费了某(些)队列的消费记录

发布于: 刚刚阅读数: 4
用户头像

关注

还未添加个人签名 2018-06-01 加入

还未添加个人简介

评论

发布
暂无评论
架构实战 7 - 消息队列MySql表格设计_架构实战营_源_InfoQ写作社区