理解 RabbitMQ 中的 AMQP-0-9-1 模型,深入 java 虚拟机第三版百度网盘
AMQP 协议
[AMQP](
)全称是 Advanced Message Queuing Protocol,它是一个(分布式)消息传递协议,使用和符合此协议的客户端能够基于使用和符合此协议的消息传递中间件代理(Broker,也就是经纪人,个人感觉叫代理合口一些)进行通信。AMQP 目前已经推出协议 1.0,实现此协议的比较知名的产品有 StormMQ、RabbitMQ、Apache Qpid 等。RabbitMQ 实现的 AMQP 版本是 0.9.1,官方文档中也提供了该协议 pdf 文本下载,有兴趣可以翻阅一下。
消息中间件代理的职责
Messaging Broker,这里称为消息中间件代理。它的职责是从发布者(Publisher,或者有些时候称为 Producer,生产者)接收消息,然后把消息路由到消费者(Consumer,或者有些时候称为 Listener,监听者)。
因为消息中间件代理、发布者客户端和消费者客户端都是基于 AMQP 这一网络消息协议,所以消息中间件代理、发布者客户端和消费者客户端可以在不同的机器上,从而实现分布式通讯和服务解耦。
消息中间件代理不仅仅提供了消息接收和消息路由这两个基本功能,还有其他高级的特性如消息持久化功能、监控功能等等。
AMQP-0-9-1 在 RabbitMQ 中的基本模型
AMQP-0-9-1 模型的基本视图是:消息发布者消息发布到交换器(Exchange)中,交换器的角色有点类似于日常见到的邮局或者信箱。然后,交换器把消息的副本分发到队列(Queue)中,分发消息的时候遵循的规则叫做绑定(Binding)。接着,消息中间件代理向订阅队列的消费者发送消息(push 模式),或者消费者也可以主动从队列中拉取消息(fetch/pull 模式)。
发布者在发布消息的时候可以指定消息属性(消息元数据),某些消息元数据可能由消息中间件代理使用,其他消息元数据对于消息中间件代理而言是不透明的,仅供消息消费者使用。
由于网络是不可靠的,客户端可能无法接收消息或者处理消息失败,这个时候消息中间件代理无法感知消息是否正确传递到消费者中,因此 AMQP 模型提供了消息确认(Message Acknowledgement)的概念:当消息传递到消费者,消费者可以自动向消息中间件代理确认消息已经接收成功或者由应用程序开发者选择手动确认消息已经接收成功并且向消息中间件代理确认消息,消息中间件代理只有在接收到该消息(或者消息组)的确认通知后才会从队列中完全删除该消息。
在某些情况下,交换器无法正确路由到队列中,那么该消息就会返回给发布者,或者丢弃,或者如果消息中间件代理实现了"死信队列(Dead Letter Queue)"扩展,消息会被放置到死信队列中。消息发布者可以选择使用对应的参数控制路由失败的处理策略。
交换器和交换器类型
交互器(Exchange)是消息发送的第一站目的地,它的作用就是就收消息并且将其路由到零个或者多个队列。路由消息的算法取决于交互器的类型和路由规则(也就是 Binding)。RabbitMQ 消息中间件代理支持四种类型的交互器,分别是:
声明交互器的时候需要提供一些列的属性,其中比较重要的属性如下:
Name:交互器的名称。
Type:交换器的类型。
Durability:(交换器)持久化特性,如果启动此特性,则 Broker 重启后交换器依然存在,否则交换器会被删除。
Auto-delete:是否自动删除,如果启用此特性,当最后一个队列解除与交换器的绑定关系,交换器会被删除。
Arguments:可选参数,一般配合插件或者 Broker 的特性使用。
之所以存在 Durability 和 Auto-delete 特性是因为并发所有的场景和用例都要求交互器是持久化的。
Direct 交换器
Direct 类型的交换器基于消息路由键(RoutingKey)把消息传递到队列中。Direct 交换器是消息单播路由的理想实现(当然,用于多播路由也可以),它的工作原理如下:
队列使用路由键 K 绑定到交换器。
当具有路由键 R 的新消息到达交换器的时候,如果 K = R,那么交换器会把消息传递到队列中。
默认交换器
默认交换器(Default Exchange)是一种特殊的 Direct 交互器,它的名称是空字符串(也就是""),它由消息中间件代理预声明,在 RabbitMQ Broker 中,它在 Web 管理界面中的名称是(AMQP default)
。每个新创建的队列都会绑定到默认交换器,路由键就是该队列的队列名,也就是所有的队列都可以通过默认交换器进行消息投递,只需要指定路由键为相应的队列名即可。
Fanout 交换器
Fanout 其实是一个组合单词,fan 也就是扇形,out 就是向外发散的意思,Fanout 交换器可以想象为"扇形"交换器。Fanout 交换器会忽
略路由键,它会路由消息到所有绑定到它的队列。也就是说,如果有 N 个队列绑定到一个 Fanout 交换器,当一个新的消息发布到该 Fanout 交换器,那么这条新消息的一个副本会分发到这 N 个队列中。Fanout 交换器是消息广播路由的理想实现。
Topic 交换器
Topic 交换器基于路由键和绑定队列和交换器的模式进行匹配从而把消息路由到一个或者多个队列。绑定队列和交换器的 Topic 模式(这个模式串其实就是声明绑定时候的路由键,和消息发布的路由键并非同一个)一般使用点号(dot,也就是'.')分隔,例如source.target.key
,绑定模式支持通配符:
符号'#'匹配一个或者多个词,例如:
source.target.#
可以匹配source.target.doge
、source.target.doge.throwable
等等。符号'*'只能匹配一个词,例如:
source.target.*
可以匹配source.target.doge
、source.target.throwable
等等。
对每一条消息,Topic 交换器会遍历所有的绑定关系,检查消息指定的路由键是否匹配绑定关系中的路由键,如果匹配,则将消息推送到相应队列。
Topic 交换器是消息多播路由的理想实现。
Headers 交换器
Headers 交换器是一种不常用的交换器,它使用多个属性进行路由,这些属性一般称为消息头,它不使用路由键进行消息路由。消息头(Message Headers)是消息属性(消息元数据)部分,因此,使用 Headers 交换器在建立队列和交换器的绑定关系的时候需要指定一组键值对,发送消息到 Headers 交换器时候,需要在消息属性中携带一组键值对作为消息头。消息头属性支持匹配规则 x-match 如下:
x-match = all:表示所有的键值对都匹配才能接受到消息。
x-match = any:表示只要存在键值对匹配就能接受到消息。
Headers 交换器也是忽略路由键的,只依赖于消息属性中的消息头进行消息路由。
队列
AMQP 0-9-1 模型中的队列与其他消息或者任务队列系统中的队列非常相似:它们存储应用程序所使用的消息。队列和交换器的基本属性有类似的地方:
Name:队列名称。
Durable:是否持久化,开启持久化意味着消息中间件代理重启后队列依然存在,否则队列会被删除。
Exclusive:是否独占的,开启队列独占特性意味着队列只能被一个连接使用并且连接关闭之后队列会被删除。
Auto-delete:是否自动删除,开启自动删除特性意味着队列至少有一个消费者并且最后一个消费者解除订阅状态(一般是消费者对应的通道关闭)后队列会自动删除。
Arguments:队列参数,一般和消息中间件代理或者插件的特性相关,如消息的过期时间(Message TTL)和队列长度等。
一个队列只有被声明(Declare)了才能使用,也就是队列的第一次声明就是队列的创建操作(因为第一次声明的时候队列并不存在)。如果使用相同的参数再次声明已经存在的队列,那么此次声明会不生效(当然也不会出现异常)。但是如果使用不相同的参数再次声明已经存在的队列,那么会抛出通道级别的异常,异常代码是 406(PRECONDITION_FAILED)。
队列名称
队列名必须由 255 字节(bytes)长度以内的 UTF-8 编码字符组成。实现 AMQP 0-9-1 规范的消息中间件代理具备自动生成随机队列名的功能,也就是在声明队列的时候,队列名指定为空字符串,那么消息中间件代理会自动生成一个队列名,并且在队列声明的返回结果中带上对应的队列名。
以"amq."开头的队列是由消息中间件代理内部生成的,有其特殊的作用,因此不能声明此类名称的新队列,否则会导致通道级别的异常,异常代码为 403(ACCESS_REFUSED)。
队列的持久化特性
持久化的队列会持久化到磁盘中,这种队列在消息中间件代理重启后不会被删除。不开启持久化特性的队列称为瞬时(transient)队列,并非所有的场景都需要开启队列的持久化特性。
队列的持久化特性并不意味着路由到它上面的消息是持久化的,也就是队列的持久化跟消息的持久化是两回事。如果息中间件代理挂了,它重启后会重新声明开启了持久化特性的队列,这些队列中只有使用了消息持久化特性的消息会被恢复。
绑定
绑定(Binding)是交换器路由消息到队列的规则。例如交换器 E 可以路由消息到队列 Q,那么 Q 必须通过一定的规则绑定到 E。绑定中使用的某些交换器的类型决定了它可以使用可选的路由键(RoutingKey)。路由键的作用类似于过滤器,可以筛选某些发布到交换器的消息路由到目标队列。
如果发布的消息没有路由到任意一个目标队列,例如,消息已经发布到交换器,交换器中没有任何绑定,这个时候消息会被丢弃或者返回给发布者,取决于消息发布者发布消息时候使用的参数。
消费者
如果队列只有发布者生产消息,那么是没有意义的,必须有消费者对消息进行使用,或者叫这个操作为消息消费,消息消费的方式有两种:
消息代理中间件向消费者推送消息(推模式,代表方法是
basic.consume
)。消费者主动向消息代理中间件拉取消息(拉模式,代表方法是
basic.get
)。
使用推模式的情况下,消费者必须指定需要订阅的队列。每个队列可以存在多个消费者,或者仅仅注册一个独占的消费者。
每个消费者(订阅者)都有一个称为消费者标签(consumer tag)的标识符,消费者标签是一个字符串。通过消费者标签可以实现取消订阅的操作。
消息确认
消费者应用程序有可能在接收和处理消息的时候崩溃,也有可能因为网络原因导致消息中间件代理投递消息到消费者的时候失败了,这样就会催生一个问题:AMQP 消息中间件代理应该在什么时候从队列中删除消息?因此,AMQP 0-9-1 规范提供了两种选择:
消息中间件代理向应用程序发送消息(使用 AMQP 方法
basic.deliver
或basic.get-ok
)。应用程序收到消息后向消息中间件代理发送确认(使用 AMQP 方法
basic.ack
?<= 个人感觉这个地方少写了basic.nack
和basic.reject
)
前一种称为自动确认模型(动作触发的同时进行了消息确认),后一种称为显式确认模型。显式确认模型中,需要消费者主动向消息中间件代理进行消息主动确认,这个消息主动确认动作的执行时机完全由应用程序控制。消息主动确认有三种方式:积极确认(ack)、消极确认(nack)和拒绝(reject)。
预取消息
预取消息(Prefetching Messages)是一个特性。对于多个消费者共享同一个队列的情况,能够告知消息中间件代理在发送下一个确认之前指定每个消费者一次可以接收消息的消息量。这个特性可以理解为简单的负载均衡技术,在批量发布消息的场景下能够提高吞吐量。
评论