前言
对于分布式消息中间件,首先要了解两个基础的概念,即什么是分布式系统,什么又是中间件。
分布式系统
“A distributed system is one in which components located at networked computers communicate and coordinate their actions only by passing messasges.”——《Distributed Systems Concepts and Design》
从上面这个解释可以得到分布式系统的两个特点:组件分布在网络计算机上组件之间通过消息来协调行动
中间件
Middleware is computer software that provides services to software applications beyond those available from the operating system. It can be described as "software glue". Middleware makes it easier for software developers to implement communication and input/output, so they can focus on the specific purpose of their application.——维基百科
中间件被描述为为应用程序提供操作系统所提供的服务之外的服务,简化应用程序的通信、输入输出的开发,使他们专注于自己的业务逻辑。从维基百科上对中间件的解释感觉有点绕,其实可以从“空间”的角度去理解中间件,即中间件是处于“中间层”的组件,是上层的应用程序和底层的服务之间的桥梁(比如 DB 中间件的上层是应用程序,底层是 DB 服务),也是应用与应用之间的桥梁(比如分布式服务组件)。
分布式消息中间件
“Message-oriented middleware (MOM) is software or hardware infrastructure supporting sending and receiving messages between distributed systems.”——维基百科
维基百科给出的消息中间件的定义是支持在分布式系统中发送和接受消息的硬件或软件基础设施(对我们这里讨论的范围来说肯定就是软件了)。
那么分布式消息中间件其实就是指消息中间件本身也是一个分布式系统。
消息中间件能做什么?
任何中间件必然都是要去解决特定领域的某个问题,消息中间件解决的就是分布式系统之间消息传递的问题。消息传递是分布式系统必然要面对的一个问题。
简单概括一下消息中间件的应用场景大致如下:
业务解耦:交易系统不需要知道短信通知服务的存在,只需要发布消息
削峰填谷:比如上游系统的吞吐能力高于下游系统,在流量洪峰时可能会冲垮下游系统,消息中间件可以在峰值时堆积消息,而在峰值过去后下游系统慢慢消费消息解决流量洪峰的问题
事件驱动:系统与系统之间可以通过消息传递的形式驱动业务,以流式的模型处理
分布式消息中间件长什么样?
一个抽象的对分布式消息中间件的认知大概是这样:
别嫌啰嗦,大致介绍一下,方便下面的理解,本系列主要讲三个常用的消息中间件,也就是 Rabbitmq、RocketMq 和 Kafka,当然篇幅所限肯定讲不完,只能挑比较重要的东西写,但也能让不会的同学初步掌握怎么去使用。
可以直接点击蓝字领取
好了,话不多说,发车喽!
RabbitMQ 除了像兔子一样跑的很快以外,还有这些特点:
一、Rabbitmq 消息队列应用
1、RabbitMQ 介绍
RabbitMQ 是一款基于 AMQP(消息队列协议),由 Erlang 开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlang 等。RabbitMQ 与其他消息队列组件性能比较,在此不作介绍,网上有大把的资料。
2、RabbitMQ 原理简介
RabbitMQ 中间件分为服务端(RabbitMQ Server)和客户端(RabbitMQ Client),服务端可以理解为是一个消息的代理消费者,客户端又分为消息生产者(Producer)和消息消费者(Consumer)。
2.1 消息生产者(Producer):主要生产消息并将消息基于 TCP 协议,通过建立 Connection 和 Channel,将消息传输给 RabbitMQ Server,对于 Producer 而言基本就完成了工作。
2.2 服务端(RabbitMQ Server):主要负责处理消息路由、分发、入队列、缓存和出列。主要由三部分组成:Exchange、RoutingKey、Queue。
(1)Exchange:用于接收消息生产者发送的消息,有三种类型的 exchange:direct, fanout,topic,不同类型实现了不同的路由算法;
A. direct exchange:将与 routing key 比配的消息,直接推入相对应的队列,创建队列时,默认就创建同名的 routing key。
B. fanout exchange:是一种广播模式,忽略 routingkey 的规则。
C. topic exchange:应用主题,根据 key 进行模式匹配路由,例如:若为 abc 则推入到所有 abc 相对应的 queue;若为 abc.#则推入到 abc.xx.one ,abc.yy.two 对应的 queue。
(2)RoutingKey:是 RabbitMQ 实现路由分发到各个队列的规则,并结合 Binging 提供于 Exchange 使用将消息推送入队列;
(3)Queue:是消息队列,可以根据需要定义多个队列,设置队列的属性,比如:消息移除、消息缓存、回调机制等设置,实现与 Consumer 通信;
2.3 消息消费者(Consumer):主要负责消费 Queue 的消息,同样基于 TCP 协议,通过建立 Connection 和 Channel 与 Queue 传输消息,一个消息可以给多个 Consumer 消费;
2.4 关键名词说明:Connection、Channel、Binging 等;
(1)Connection:是建立客户端与服务端的连接。
(2)Channel:是基于 Connection 之上建立通信通道,因为每次 Connection 建立 TCP 协议通信开销及性能消耗较大,所以一次建立 Connection 后,使用多个 Channel 通道通信减少开销和提高性能。
(3)Binging:是一个捆绑定义,将 exchange 和 queue 捆绑,定义 routingkey 相关策略。
3、RabbitMQ 安装部署
以上对 RabbitMQ 简介,接下来我们通过实际搭建消息队列服务实践。RabbitMQ 服务端能运行于 Window、Linux 和 Mac 平台,客户端也支持多种技术的实现。本次我们将在 Linux 之 CentOS7 平台搭建。
3.1 安装 Erlang 运行环境
由于 RabbitMQ 使用 Erlang 技术开发,所以需要先安装 Erlang 运行环境后,才能安装消息队列服务。
(1)配置系统能正常访问公网,设置默认网关
`route add ``default` `gw 192.168.1.1`
复制代码
(2)安装 erlang
`su -c ``'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-5.noarch.rpm'`
`sudo yum install erlang`
复制代码
(3)检查 erlang 是否安装成功
(4)安装成功
3.2 安装 RabbitMQ 服务端
(1)下载安装包
`wget http:``//www.rabbitmq.com/releases/rabbitmq-server/v3.6.0/rabbitmq-server-3.6.0-1.noarch.rpm`
复制代码
(2)安装和配置 RabbitMQ 服务端,3.6.0 版本:
`rpm --import https:``//www.rabbitmq.com/rabbitmq-signing-key-public.asc`
`yum install rabbitmq-server-3.6.0-1.noarch.rpm`
复制代码
(3)启用 web 管理插件
`rabbitmq-plugins enable rabbitmq_management`
复制代码
(4)启动 RabbitMQ
`chkconfig rabbitmq-server ``on`
`/sbin/service rabbitmq-server start`
复制代码
(5)防火墙开通端口
`# firewall-cmd --permanent --zone=public --add-port=5672/tcp`
`# firewall-cmd --permanent --zone=public --add-port=15672/tcp`
`# firewall-cmd --reload`
复制代码
(6)rabbitmq 默认会创建 guest 账号,只能用于 localhost 登录页面管理员,本机访问地址:
http://localhost:15672/
`rabbitmqctl add_user test test`
`rabbitmqctl set_user_tags test administrator<br>rabbitmqctl set_permissions -p / test ``".*"` `".*"` `".*"`
复制代码
RabbitMQ 管理员页面。
4、RabbitMQ 应用
本章节描述,web 应用生产的日志,通过 rabbitmq 传输,然后日志服务接收消息队列的消息。
本系统采用官方的 Client,通过 nuget 引用。
4.1 Web 应用生产业务日志
[HttpPost]
public ActionResult Create()
{
this.HttpContext.Session["mysession"] = DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss");
var txt = Request.Form["txtSite"].ToString();
RabbitMQHelper helper = new RabbitMQHelper();
helper.SendMsg(txt + ",操作日志,时间:" + DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
return RedirectToAction("Index");
}
`}`
复制代码
页面效果图。
4.2 日志服务接收日志消息
基于 window form 开发一个日志处理服务,并将接收的消息打印出来。
private void btnReceive_Click(object sender, EventArgs e)
{
isConnected = true;
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("MyLog", false, false, false, null);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume("MyLog", true, consumer);
while (isConnected)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
txtMsg.Text += message + "\r\n";
}
}
}
复制代码
4.3 RabbitMQ 页面监控情况
RabbitMQ 自带页面监控工具,通过此工具可以监控 MQ 的情况:
完整版的消息中间件学习资料和我个人整理的笔记可以直接点击蓝字领取
二、Rabbitmq 消息确认机制
1、生产端 Confirm 消息确认机制
消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!
Confirm 确认机制流程图
2、如何实现 Confirm 确认消息?
第一步:在 channel 上开启确认模式: channel.confirmSelect()
第二步:在 channel 上添加监听: channel.addConfirmListener(ConfirmListener listener);
, 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class ConfirmProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "item.update";
//指定消息的投递模式:confirm 确认模式
channel.confirmSelect();
//发送
final long start = System.currentTimeMillis();
for (int i = 0; i < 5 ; i++) {
String msg = "this is confirm msg ";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
System.out.println("Send message : " + msg);
}
//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息
channel.addConfirmListener(new ConfirmListener() {
/**
* 返回成功的回调函数
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("succuss ack");
System.out.println(multiple);
System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
}
/**
* 返回失败的回调函数
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("defeat ack");
System.out.println("耗时:" + (System.currentTimeMillis() - start) + "ms");
}
});
}
}
复制代码
`import com.rabbitmq.client.*;
import java.io.IOException;
public class ConfirmConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String queueName = "test_confirm_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
//创建消费者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
}
}
复制代码
我们此处只关注生产端输出消息
Send message : this is confirm msg
Send message : this is confirm msg
Send message : this is confirm msg
Send message : this is confirm msg
Send message : this is confirm msg
succuss ack
true
耗时:3ms
succuss ack
true
耗时:4ms
复制代码
注意事项
我们采用的是异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回调这个方法。除此之外还有单条同步 confirm 模式、批量同步 confirm 模式,由于现实场景中很少使用我们在此不做介绍,如有兴趣直接参考官方文档。
我们运行生产端会发现每次运行结果都不一样,会有多种情况出现,因为 Broker 会进行优化,有时会批量一次性 confirm ,有时会分开几条 confirm。
`succuss ack
true
耗时:3ms
succuss ack
false
耗时:4ms
或者
succuss ack
true
耗时:3ms`
复制代码
3、Return 消息机制
Return Listener 用于处理一-些不可路 由的消息!
消息生产者,通过指定一个 Exchange
和 Routingkey
,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
但是在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定的路由 key 路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用 Return Listener !
在基础 API 中有一个关键的配置项:Mandatory
:如果为 true
,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为 false
,那么 broker 端自动删除该消息!
Return 消息机制流程图
Return 消息示例
首先我们需要发送三条消息,并且故意将第 0 条消息的 routing Key
设置为错误的,让他无法正常路由到消费端。
mandatory
设置为 true
路由不可达的消息会被监听到,不会被自动删除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
最后添加监听即可监听到不可路由到消费端的消息channel.addReturnListener(ReturnListener r))
`import com.rabbitmq.client.*;
import java.io.IOException;
public class ReturnListeningProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "item.update";
String errRoutingKey = "error.update";
//指定消息的投递模式:confirm 确认模式
channel.confirmSelect();
//发送
for (int i = 0; i < 3 ; i++) {
String msg = "this is return——listening msg ";
//@param mandatory 设置为 true 路由不可达的消息会被监听到,不会被自动删除
if (i == 0) {
channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
} else {
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
}
System.out.println("Send message : " + msg);
}
//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息
channel.addConfirmListener(new ConfirmListener() {
/**
* 返回成功的回调函数
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("succuss ack");
}
/**
* 返回失败的回调函数
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("defeat ack");
}
});
//添加一个 return 监听
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("return relyCode: " + replyCode);
System.out.println("return replyText: " + replyText);
System.out.println("return exchange: " + exchange);
System.out.println("return routingKey: " + routingKey);
System.out.println("return properties: " + properties);
System.out.println("return body: " + new String(body));
}
});
}
}
复制代码
`import com.rabbitmq.client.*;
import java.io.IOException;
public class ReturnListeningConsumer {
public static void main(String[] args) throws Exception {
//1\. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
//2\. 通过连接工厂来创建连接
Connection connection = factory.newConnection();
//3\. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();
//4\. 声明
String exchangeName = "test_return_exchange";
String queueName = "test_return_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
//5\. 创建消费者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6\. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
}
}
复制代码
我们只关注生产端结果,消费端只收到两条消息。
`Send message : this is return——listening msg
Send message : this is return——listening msg
Send message : this is return——listening msg
return relyCode: 312
return replyText: NO_ROUTE
return exchange: test_return_exchange
return routingKey: error.update
return properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
return body: this is return——listening msg
succuss ack
succuss ack
succuss ack
复制代码
4、消费端 Ack 和 Nack 机制
消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!如果由于服务器宕机等严重问题,那我们就需要手工进行 ACK 保障消费端消费成功!消费端重回队列是为了对没有处理成功的消息,把消息重新会递给 Broker!一般我们在实际应用中,都会关闭重回队列,也就是设置为 False。
参考 api
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
void basicAck(long deliveryTag, boolean multiple) throws IOException;
如何设置手动 Ack 、Nack 以及重回队列
首先我们发送五条消息,将每条消息对应的循环下标 i 放入消息的 properties
中作为标记,以便于我们在后面的回调方法中识别。
其次, 我们将消费端的 ·channel.basicConsume(queueName, false, consumer);
中的 autoAck
属性设置为 false
,如果设置为true
的话 将会正常输出五条消息。
我们通过 Thread.sleep(2000)
来延时一秒,用以看清结果。我们获取到properties
中的num
之后,通过channel.basicNack(envelope.getDeliveryTag(), false, true);
将 num
为 0 的消息设置为 nack,即消费失败,并且将 requeue
属性设置为true
,即消费失败的消息重回队列末端。
`import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;
public class AckAndNackProducer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String routingKey = "item.update";
String msg = "this is ack msg";
for (int i = 0; i < 5; i++) {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num" ,i);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.headers(headers)
.build();
String tem = msg + ":" + i;
channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());
System.out.println("Send message : " + msg);
}
channel.close();
connection.close();
}
}
复制代码
import com.rabbitmq.client.*;
import java.io.IOException;
public class AckAndNackConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, false, false, false, null);
//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if ((Integer) properties.getHeaders().get("num") == 0) {
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//6\. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, false, consumer);
}
}
复制代码
我们此处只关心消费端输出,可以看到第 0 条消费失败重新回到队列尾部消费。
[x] Received 'this is ack msg:1'
[x] Received 'this is ack msg:2'
[x] Received 'this is ack msg:3'
[x] Received 'this is ack msg:4'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
[x] Received 'this is ack msg:0'
复制代码
三、Rabbitmq 镜像队列
1、 镜像队列的设置
镜像队列的配置通过添加 policy 完成,policy 添加的命令为:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定 vhost 下的 queue 进行设置
Name: policy 的名称
Pattern: queue 的匹配模式(正则表达式)
Definition: 镜像定义,包括三个部分 ha-mode,ha-params,ha-sync-mode
ha-mode: 指明镜像队列的模式,有效值为 all/exactly/nodes
- exactly表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
复制代码
ha-params: ha-mode 模式需要用到的参数
ha-sync-mode: 镜像队列中消息的同步方式,有效值为 automatic,manually
Priority: 可选参数, policy 的优先级
例如,对队列名称以 hello 开头的所有队列进行镜像,并在集群的两个节点上完成镜像,policy 的设置命令为:
rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
复制代码
2、 镜像队列的大概实现
2.1 整体介绍
通常队列由两部分组成:一部分是 amqqueue_process,负责协议相关的消息处理,即接收生产者发布的消息、向消费者投递消息、处理消息 confirm、acknowledge 等等;另一部分是 backing_queue,它提供了相关的接口供 amqqueue_process 调用,完成消息的存储以及可能的持久化工作等。
镜像队列同样由这两部分组成,
amqqueue_process 仍旧进行协议相关的消息处理
backing_queue 则是由 master 节点和 slave 节点组成的一个特殊的 backing_queue
master 节点和 slave 节点都由一组进程组成,一个负责消息广播的 gm,一个负责对 gm 收到的广播消息进行回调处理。
master 节点上回调处理是 coordinator
slave 节点上则是 mirror_queue_slave。mirror_queue_slave 中包含了普通的 backing_queue 进行消息的存储
master 节点中 backing_queue 包含在 mirror_queue_master 中由 amqqueue_process 进行调用。
注意:消息的发布与消费都是通过 master 节点完成。master 节点对消息进行处理的同时将消息的处理动作通过 gm 广播给所有的 slave 节点,slave 节点的 gm 收到消息后,通过回调交由 mirror_queue_slave 进行实际的处理。
2.2 gm(Guaranteed Multicast)
传统的主从复制方式:由 master 节点负责向所有 slave 节点发送需要复制的消息,在复制过程中,如果有 slave 节点出现异常,master 节点需要作出相应的处理;如果是 master 节点本身出现问题,那么 slave 节点间可能会进行通信决定本次复制是否继续。当然为了处理各种异常情况,整个过程中的日志记录是免不了的。
然而 rabbitmq 中并没有采用这种方式,而是将所有的节点形成一个循环链表,每个节点都会监控位于自己左右两边的节点,当有节点新增时,相邻的节点保证当前广播的消息会复制到新的节点上;当有节点失效时,相邻的节点会接管保证本次广播的消息会复制到所有节点。
在 master 节点和 slave 节点上的这些 gm 形成一个 group,group 的信息会记录在 mnesia 中。不同的镜像队列形成不同的 group。
消息从 master 节点对应的 gm 发出后,顺着链表依次传送到所有节点,由于所有节点组成一个循环链表,master 节点对应的 gm 最终会收到自己发送的消息,这个时候 master 节点就知道消息已经复制到所有 slave 节点了。
2.3 重要的表结构
rabbit_queue 表记录队列的相关信息:
-record(amqqueue,
{
name, %%队列的名称
durable, %%标识队列是否持久化
auto_delete, %%标识队列是否自动删除
exclusive_owner, %%标识是否独占模式
arguments, %%队列创建时的参数
pid, %%amqqueue_process进程PID
slave_pids, %%mirror_queue_slave进程PID集合
sync_slave_pids, %%已同步的slave进程PID集合
policy, %%与队列有关的policy
%%通过set_policy设置,没有则为undefined
gm_pids, %%{gm,mirror_queue_coordinator},{gm,mirror_queue_slave}进程PID集合
decorator %%
}).
复制代码
注意:slave_pids 的存储是按照 slave 加入的时间来排序的,以便 master 节点失效时,提升"资格最老"的 slave 节点为新的 master。
gm_group 表记录 gm 形成的 group 的相关信息:
-record(gm_group,
{
name, %%group的名称,与queue的名称一致
version, %%group的版本号, 新增节点/节点失效时会递增
members, %%group的成员列表, 按照节点组成的链表顺序进行排序
}).
复制代码
3、镜像队列的一些细节
3.1 新增节点
slave 节点先从 gm_group 中获取对应 group 的所有成员信息,然后随机选择一个节点并向这个节点发送请求,这个节点收到请求后,更新 gm_group 对应的信息,同时通知左右节点更新邻居信息(调整对左右节点的监控)及当前正在广播的消息,然后回复通知请求节点成功加入 group。请求加入 group 的节点收到回复后再更新 rabbit_queue 中的相关信息,并根据需要进行消息的同步。
3.2 消息的广播
消息从 master 节点发出,顺着节点链表发送。在这期间,所有的 slave 节点都会对消息进行缓存,当 master 节点收到自己发送的消息后,会再次广播 ack 消息,同样 ack 消息会顺着节点链表经过所有的 slave 节点,其作用是通知 slave 节点可以清除缓存的消息,当 ack 消息回到 master 节点时对应广播消息的生命周期结束。
下图为一个简单的示意图,A 节点为 master 节点,广播一条内容为"test"的消息。"1"表示消息为广播的第一条消息;"id=A"表示消息的发送者为节点 A。右边是 slave 节点记录的状态信息。
为什么所有的节点都需要缓存一份发布的消息呢?
master 发布的消息是依次经过所有 slave 节点,在这期间的任何时刻,有可能有节点失效,那么相邻的节点可能需要重新发送给新的节点。例如,A->B->C->D->A 形成的循环链表,A 为 master 节点,广播消息发送给节点 B,B 再发送给 C,如果节点 C 收到 B 发送的消息还未发送给 D 时异常结束了,那么节点 B 感知后节点 C 失效后需要重新将消息发送给 D。同样,如果 B 节点将消息发送给 C 后,B,C 节点中新增了 E 节点,那么 B 节点需要再将消息发送给新增的 E 节点。
gm 的状态记录:
-record(state,
{
self, %%gm本身的ID
left, %%该节点左边的节点
right, %%该节点右边的节点
group_name, %%group名称 与队列名一致
module, %%回调模块 rabbit_mirror_queue_slave或者
%%rabbit_mirror_queue_coordinator
view, %%group成员列表视图信息
%%记录了成员的ID及每个成员的左右邻居节点
pub_count, %%当前已发布的消息计数
members_state, %%group成员状态列表 记录了广播状态:[#member{}]
callback_args, %%回调函数的参数信息
%%rabbit_mirror_queue_slave/rabbit_mirror_queue_coordinator进程PID
confirms, %%confirm列表
broadcast_buffer, %%缓存待广播的消息
broadcast_timer, %%广播消息定时器
txn_executor
}).
-record(member,
{
pending_ack, %%待确认的消息,也就是已发布的消息缓存的地方
last_pub, %%最后一次发布的消息计数
last_ack %%最后一次确认的消息计数
}).
复制代码
3.3 节点的失效
当 slave 节点失效时,仅仅是相邻节点感知,然后重新调整邻居节点信息、更新 rabbit_queue、gm_group 的记录等。如果是 master 节点失效,"资格最老"的 slave 节点被提升为 master 节点,slave 节点会创建出新的 coordinator,并告知 gm 修改回调处理为 coordinator,原来的 mirror_queue_slave 充当 amqqueue_process 处理生产者发布的消息,向消费者投递消息等。
上面提到如果是 slave 节点失效,只有相邻的节点能感知到,那么 master 节点失效是不是也是只有相邻的节点能感知到?假如是这样的话,如果相邻的节点不是"资格最老"的节点,怎么通知"资格最老"的节点提升为新的 master 节点呢?
实际上,所有的 slave 节点在加入 group 时,mirror_queue_slave 进程会对 master 节点的 amqqueue_process 进程(也可能是 mirror_queue_slave 进程)进行监控,如果 master 节点失效的话,mirror_queue_slave 会感知,然后再通过 gm 进行广播,这样所有的节点最终都会知道 master 节点失效。当然,只有"资格最老"的节点会提升自己为新的 master。
另外,在 slave 提升为 master 时,mirror_queue_slave
内部来了一次"偷梁换柱",即原本需要回调mirror_queue_slave
的handle_call/handle_info/handle_cast
等接口进行处理的消息,全部改为调用amqqueue_process
的handle_call/handle_info/handle_cast
等接口,从而可以解释上面说的,mirror_queue_slave
进程充当了amqqueue_process
完成协议相关的消息的处理。
rabbit_mirror_queue_slave.erl
handle_call({gm_deaths,LiveGMPids},From,
State = #state{q = Q = #amqqueue{name=QName,pid=MPid}})->
Self = self(),
case rabbit_mirror_queue_misc:remove_from_queue(QName,
Self,
LiveGMPids) of
{ok,Pid,DeadPids} ->
case Pid of
MPid ->
%% master hasn't changed
gen_server2:reply(From, ok),
noreply(State);
Self ->
%% we've become master
QueueState = promote_me(From,State),
{become,
%% 改由rabbit_amqqueue_process模块处理消息
rabbit_amqqueue_process,
QueueState, hibernate};
...
gen_server2.erl
handle_common_reply(Reply,Msg,GS2State = #gs2_state{name=Name,
debug=Debug})->
case Reply of
...
{become, Mod, NState, Time1} ->
Debug1=common_become(Name,Mod,NState,Debug),
loop(find_prioritisers(
GS2State#gs2_state{mod=Mod,
state=NState,
time=Time1,
debug=Debug1}));
...
handle_msg({'gen_call',From,Msg},
GS2State=#gs2_state{mod=Mod,
state=State,
name=Name,
debug=Debug}) ->
case catch Mod:handle_call(Msg, From, State) of
...
handle_msg(Msg,GS2State=#gs2_state{mod=Mod,state=State})->
Reply = (catch dispatch(Msg,Mod,State)),
handle_common_reply(Reply, Msg, GS2State).
dispatch({'$gen_cast',Msg},Mod,State)->
Mod:handle_cast(Msg, State);
dispatch(Info, Mod, State)->
Mod:handle_info(Info,State).
复制代码
4、消息的同步
配置镜像队列的时候有个 ha-sync-mode 属性,这个有什么用呢?
新节点加入到 group 后,最多能从左边节点获取到当前正在广播的消息内容,加入 group 之前已经广播的消息则无法获取到。如果此时 master 节点不幸失效,而新节点有恰好成为了新的 master,那么加入 group 之前已经广播的消息则会全部丢失。
注意:这里的消息具体是指新节点加入前已经发布并复制到所有 slave 节点的消息,并且这些消息还未被消费者消费或者未被消费者确认。如果新节点加入前,所有广播的消息被消费者消费并确认了,master 节点删除消息的同时会通知 slave 节点完成相应动作。这种情况等同于新节点加入前没有发布任何消息。
避免这种问题的解决办法就是对新的 slave 节点进行消息同步。当 ha-sync-mode 配置为自动同步(automatic)时,新节点加入 group 时会自动进行消息的同步;如果配置为 manually 则需要手动操作完成同步
就先写到这把,本来是想一篇文把三个中间件都写了的,没想到不知不觉写了这么多我都感觉 Rabbitmq 还有很多东西还没写到,后面会再写两篇专门讲一下 RocketMq 和 kafka,感兴趣的朋友可以给我点个关注。
完整版的消息中间件学习资料和我个人整理的笔记直接点击蓝字领取
如果可以点个赞就更好了,你说呢
end
评论