消息队列架构设计
前言
本篇的内容是关于微博业务消息队列中间件的详细架构设计文档。在本文档中只涉及消息队列相关的部份。用于指导消息队列后续的开发,测试以及运维。
词汇表
Reactor: 网络编程模式
Netty : 开源的高性能网络编程框架
Redis : 开源的分布式数据库
Kafka : 开源的消息中间件
1. 业务背景
随着某微博业务的不断发展,使用该微博的用户越来越多。导致该微博的卡顿现象越来越严重。经过相关人员的调查后发现,是由于该微博系统中存在大量同
步调用导致系统在大量访问下出现几个明显的现象 :
性能问题: 当用户在微博上发布一条消息后,微博中发布子系统需要调用同步微博系统中的"审核子系统","统计子系统"等相关九个子系统才能完成一次用户
微博的发布功能,性能问题十分突出。
系统耦合问题: 对于该微博系统而言, 如果日后想增加其他的子系统,比如说: "广告子系统"或者移除不使用的子系统的时候,则需要开发对应子系统的接口
连接新的子系统
系统开发效率问题: 对于该微博系统而言,每一个微博中的子系统中的接口都有一些细微的差别,如果每次增加子系统的同时都要将之前的工作重复做一遍,
这样会给开发以及测试,甚至运维团队带来许多重复的工作量.
基于以上对系统的分析以及系统中所存在的问题,因此
在系统中我们需要引入消息队列进行各个系统的解耦,将系统中现存在同步调用更改为异步调用。从而
解决系统中所面临的问题
2. 约束和限制
1.该微博系统后台的数据库采用的是 Oracle.
2.由于对于该微博系统的性能问题使用者意见很大,严重影响到了使用者的体验,因此急于改善该系统的性能问题.因此该只有一个多月的时间完成改造
3.对微博系统改造的成本只有 50 多万,预算有限
4.务必保证改造成功
5.采用熟悉的消息队列,从而保证质量
3. 总体架构
3.1 架构分析
3.1.1 该架构具有高可用性: 对于每一个微博的子系统而言,如果在传输消息的过程中将消息丢了,则会导致相关联的子系统接受不到消息从而影响
到子系统功能的执行。例如:对于审核消息系统而言,如果没有收到需要审核的消息,而该消息触犯了国家的法律法规,则会导致非常严重的后果.
进一步讲,对于不同的子系统丢失消息所造成的后果也不同。这里举个例子: 对于判断用户等级子系统而言,如果系统在传输过程中丢失了消息,则最多会造成
用户没有取得该用户应得的等级和对应的服务,从而也会造成该用户的不满。但是所造成的后果没有审核子系统丢失消息所造成的结果严重。
3.1.2 该架构在人力成本上控制的住,由于在该架构中大量采用成熟的开源组件(比如 Kafka, ActiveMQ),使得对于实现该架构所花费的成本大大下降
.至少在测试环节对于该消息中间件无须太多测试投入。而且在该架构的开发过程中也避免了大量开发成本的投入。而且对于项目后期维护而言,由于采用的是
开源的成熟产品,所以无需在后期维护上花费太多成本
3.1.3 对于该架构而言,由于 Kafka 是一个大容量的日志消息传输器,从而保证我们业务数据的可靠传输
3.1.4 对于该架构而言,Kafka 已经相当成熟,在实际开发团队和测试团队,甚至业务团队中也了解过 Kafka,在实际使用过程中无需在额外投入这方
面的成本
3.2 总体架构
1. 对于整个消息队列架构而言,首先采用数据分散集群的架构。对于整个集群中的服务器根据服务的子系统进行分组,每个分组存储一部分消息数据。
2. 对于每个分组中的集群分配一台主 Oracle 数据库服务器和一台备用 Oracle 数据库服务器,在每一个分组内主备数据库复制,分组与分组之间的数据
不同步。
3. 在通常的情况下,分组内的主服务器对外提供的消息的写入和消息的读取服务,备用服务器不对外提供服务,仅仅提供数据备份功能。
4. 在主服务器宕机的情况下,备用服务器对外提供消息的读取,保存的服务。
5. 客户端采取轮询的方式来写入和读取消息。
4. 详细设计
1. 首先客户端链接到消息队列的服务器,打一个通讯通道 channel
2. 客户端声明一个 exchange,并且设置相关属性
3. 客户端声明一个 queue,并且设置相关的属性
4. 客户端使用 routing key,在 exchange 和 queue 之间建立好绑定关系
5. 客户端投递消息到 exchange.
最后,exchange 接受到消息后,就根据消息的 key 和已经设置的 binding,进行消息路由,将消息投递到一个或者多个队列里
4.1 核心功能
4.1.1 消息发送流程:
消息队列系统设计两个角色: 生产者和消费者, 每个角色都有唯一的名称。这里以 Kafka 的消息发送流程为例: 首先调用
KafkaProducer 的 send,该方法不会直接向 broker 发送消息,kafka 以异步形式发送消息并且将其分解成两个步骤: send 方法的职责是将消息追加到内存中,然
后会由专门的 Send 线程异步将缓存中的消息指发送到 Kafka Broker 中。
4.2 关键设计
4.2.1. 保证消息发送的可靠性:
在每个业务服务器中加入消息队列系统所提供的 SDK,SDK 支持轮询发送消息。当一个分组的主要服务器无法发送消息时,SDK 挑选下
一个分组主服务器重发消息,依次尝试所有主服务器直到发送成功;如果全部主服务器都无法发送,SDK 可以保存并且缓存该消息,也可以直接丢弃该消息,具
体的策略可以在启动 SDK 的时候进行配置指定。如果 SDK 缓存了一 些消息并未发送,此时恰好业务服务器又已经重启,则所有缓存的消息将永久的丢失,在这种
情况下 SDK 不做任何处理,业务方需要针对非常关键的消息自己实现永久存储的问题
4.2.2. 保证消息存储可靠性:
消息队列中的每条消息都存储在 Oracle 表中,每个分组有一主一备两台 Oracle 数据库服务器,主备 Oracle 数据库服务器之间复制消
息以保证消息存储的高可用。如果主备之间出现复制延迟,如果此时 Oracle 数据库主服务器宕机导致数据无法恢复,则部分的消息会永久丢失。 在这种情况
下也不做针对性的设计, DBA 需要对主备间的复制延迟进行监控,当复制延迟超过 30 秒的时候需要及时警告并且进行处理
4.2.3. 消息如何存储
每个消息队列在 Oracle 中对应的就是一张表,消息队列名就是表名称,表的结构设计如下:
主键,详细消息数据,消息发送目的地,消息发送状态,消息创建时间
4.2.4. 如果保证避免发送重复消息
常用的消息队列基本上都能确保消息到达,但是不能保证消息重复发送,所以可以存在一条消息被重复发送。一般情况下,成功将
消息发送给目的地后,会接收到回应消息。但是如果当网络出现问题时,这时候目的地成功收到消息,但是由于网络问题,发送端一般会重发消息,所以会导
致发送重复发送多条相同的消息给接收方。以下是几种解决方案:
4.2.4.1 乐观锁
4.2.4.2 唯一索引
4.2.4.3 记录每条已经发送的消息状态
4.2.5 如果保证发送的消息不丢失消息
4.2.5.1 在发送端向 broker 发送消息时,如果由于网络抖动等一些不可控原因导致消息发送失败,可以设置失败重试次数让消息重
发。
4.2.5.2 在接收端接收消息时,由于网络等不可控的原因没法从 broker 发送到目的端,此时会重试直到发送成功
4.2.5.3 接收端已经正常接收到消息但是在执行后续消息处理时发生了异常,最终返回处理失败。
为了保证消息是肯定被至少消费
成功一次,Kafka 会把这批消息重发回 Broker,在延迟的某个时间点后,重新再次发送。而如果一直这样重复消费都持续失败到一定次数(该重复尝试次数可设
置),就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。
4.3 设计规范
4.3.1 整个项目是基于 Spring Boot + Netty 进行开发的
4.3.2 采用 Oracle 数据库服务器
4.3.3 发送的数据包以 Json 格式与服务器作交互
4.3.4 连接 Netty 的客户端采用 Java 语言进行开发,并且基于 Netty 实现与各个服务器交互
4.3.5 服务器基于 ZooKeeper 进行主从切换
4.3.6 每个消息队列对应一张表
4.3.7 每个消息表中最多存储 30 天内的消息,过期的消息自动清除
4.3.8 直接采用 Oracle 的主从复制来实现数据复制
5. 质量设计
5.1 消息队列管理后台
为了方便管理消息队列中所传递的消息,并且实时的监测和维护消息队列。
5.2 成本
在项目开发过程中需要严格控制成本,项目的成本控制浮动在 10 左右。
5.3 发送数据包规范
以 Json 数据格式发送数据
5.4 开发语言
采用 Java 语言进行开发
6. 演进规划
6.1. 消息队列第一期 : 实现消息发送,权限控制的功能,大致时间为 3 个月
6.2. 消息队列第二期 : 实现消息读取功能,预计时间为 1 个月
6.3: 消息队列第三期 : 实现消息主备基于 ZooKeeper 切换的功能,预计时间为 1 个月
评论