写点什么

设计消息队列存储消息数据的 MySQL 表格

作者:唐尤华
  • 2022 年 4 月 02 日
  • 本文字数:2533 字

    阅读完需:约 8 分钟

设计消息队列存储消息数据的 MySQL 表格

作业思路:

1.调查消息队列常见用例,分析需要存储的数据

2.根据调查结果,自行设计,并记录设计思路和理由

3.调查常用消息队列,查漏补缺

1 消息队列常见用例

角色:生产者、消费者、交换器(路由键、绑定、类型)、消息队列

1.1 运转流程

1) 生产者发送消息流程

(1)生产者连接到 RabbitMQ Broker , 建立一个连接(Connection) ,开启一个信道(Channel);(2)生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等;(3)生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等;(4)生产者通过绑定键(路由键)将交换器和队列绑定起来;(5)生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息;(6)相应的交换器根据接收到的路由键查找相匹配的队列;(7)如果找到,则将从生产者发送过来的消息存入相应的队列中;(8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者;(9)关闭信道;(10)关闭连接。
复制代码

参考 RabbitMQ 运转流程

2) 消费者接收消息流程

(1)消费者连接到 RabbitMQ Broker ,建立一个连接(Connection) ,开启一个信道(Channel);(2)消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作;(3)等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息;。(4)消费者确认(ack) 接收到的消息;(5)RabbitMQ 从队列中删除相应己经被确认的消息;(6)关闭信道;(7)关闭连接。
复制代码

1.2 消息队列


RabbitMQ 消息队列参数

消息队列参数说明:

Message TTL : 消息生存期,单位毫秒Auto expire : 队列生存期,单位毫秒Max length : 队列可以容纳的消息的最大条数Max length bytes : 队列可以容纳的消息的最大字节数Overflow behaviour : 队列中的消息溢出后如何处理Dead letter exchange : 溢出的消息需要发送到绑定该死信交换机的队列Dead letter routing key : 溢出的消息需要发送到绑定该死信交换机,并且路由键匹配的队列Maximum priority : 最大优先级Lazy mode : 懒人模式Master locator : 集群相关设置
复制代码

参考 RabbitMQ 消息发送、消费和确认

1.3 生产与消费消息

生产消息参数说明:

exchange:交换器名称routingKey:路由键mandatory:是否强制的,booleanimmediate:是否立即的,booleanprops:BasicProperties 类型,消息属性或者叫消息元数据body:字节数组类型,消息的有效负载
复制代码

消费消息参数说明:

contentType:消息内容类型contentEncoding:消息内容编码headers:头部属性,Key/Value 结构deliveryMode:消息的持久化模式priority:消息优先级,可选值为 0-255correlationId:客户端定义的用于客户端区分和标识消息的唯一标记replyTo:需要应答的目标队列名expiration:消息过期时间,单位为毫秒messageId:消息的唯一标识timestamp:消息发送时的时间戳type:可选的消息类型userId:可选的发布消息的用户的唯一标识appId:可选的发布消息的应用的唯一标识clusterId:集群唯一标识
复制代码

1.4 其他场景(特性)

死信队列延迟队列优先级队列RPC 实现消息顺序持久化确认机制:事务机制、发送方确认机制消息传输保障消息队列管理:多租户与权限、用户管理、Web 端管理、应用管理、集群管理、接口管理消息追踪
复制代码

2 消息数据 MySQL 表格

2.1 消息数据分析

分析:

  • 消息数据与发送、接收、队列、事务、确认、集群、序列化等有关。

  • 消息数据与传递的过程、系统环境无关。比如交换器相关参数、传输协议特定参数、消息队列配置信息。


消息结构:

msgId 消息 Id,供序列化sender 发送方receiver 接收方header 头部信息encoding 编码信息body 消息体createdAt 创建时间expiration 过期时间priority 优先级transactionId 事务 Idtopic 主题,可选qos 质量信息ack 确认信息appId 应用 Id,可选clusterId 集群 Id,可选
复制代码


实例调查:


RocketMQ 消息队列信息补充

producerHost 生产者主机consumerHost 消费者主机createdAt 消息创建时间storedAt 存储时间consumeCount 消费次数properties 属性, key/value对checksum 消息完整性校验
复制代码

参考 RocketMQ 的消息存储结构


Kafka 消息队列信息补充

attributes 压缩算法、时间戳类型、控制指令等producerEpoch 发送端的 Epoch,用于做消息的幂等处理baseSequence BatchRecords 的序列号,用于做消息的幂等处理
复制代码

参考 Kafka消息(存储)格式及索引组织方式

2.2 MySQL 消息数据表

需求

自研消息队列架构
  1. Java 语言编写消息队列服务器;

  2. 消息存储采用 MySQL;

  3. SDK 轮询服务器进行消息写入;

  4. SDK 轮询服务器进行消息读取;

  5. MySQL 双机保证消息尽量不丢;

  6. 使用 Netty 自定义消息格式,并且支持 HTTP 接口。


设计考虑


  • 采用所有消息放一张表,里面加一个“队列名称”。


如果按照队列名称建表,在上面的架构中,数据分组会出现数据分布不均衡(某些数据分组过大或过小)、业务服务器 SDK 轮询写入复杂度较高(业务对应队列名,进而对应存储)。


如果统一放在一张表,架构只需要考虑数据的分片(分组)、数据的高可用。当然,需要选择合适的分片方式。


MySQL 消息数据表


CREATE TABLE `mq_message` (    `id` int NOT NULL AUTO_INCREMENT COMMENT `消息Id`,    `queue` NOT NULL varchar(32) COMMENT `消息队列`,    `topic` NOT NULL varchar(32) COMMENT `消息主题`,    `sender` NOT NULL int COMMENT `发送方`,    `priority` int COMMENT `优先级`,    `header` int COMMENT `头部信息`,    `encoding` int COMMENT `编码信息`,    `body` varchar(512) COMMENT `消息体`,    `checksum` int COMMENT `消息完整性校验`,    `ack` NOT NULL int COMMENT `确认情况`,    `consumeCount` int COMMENT `消费次数`,    `createdAt` timestamp COMMENT `创建时间`,    `storedAt` timestamp COMMENT `存储时间`,    `expiredAt` timestamp COMMENT `过期时间`,    `appId` int COMMENT `应用 Id,可选`,    `properties` varchar(512) COMMENT `属性, key/value`    PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='消息队列消息表';
复制代码


索引:根据分片的要求设计索引。读取信息时的根据客户端读消息 API 参数建立索引。


  • id 索引:分片

  • queue 索引:客户端读

  • topic 索引:客户端读

发布于: 刚刚阅读数: 2
用户头像

唐尤华

关注

还未添加个人签名 2018.03.27 加入

还未添加个人简介

评论

发布
暂无评论
设计消息队列存储消息数据的 MySQL 表格_架构实战营_唐尤华_InfoQ写作平台