🏆【Alibaba 中间件技术系列】「RocketMQ 技术专题」小白专区之领略一下 RocketMQ 基础之最!
应一些小伙伴们的私信,希望可以介绍一下 RocketMQ 的基础,那么我们现在就从 0 开始,进入 RocketMQ 的基础学习及概念介绍,为学习和使用 RocketMQ 打好基础!
RocketMQ 的定位
RocketMQ 是一款快速地、可靠地、分布式、容易使用的消息中间件,由 Alibaba 开发,其前身是 Metaq,Metaq 可以看成是 linkedin 的 Kafka(scala)的 java 版本,并对其增加了事务的支持。
RocketMQ 的定义
RocketMQ 为 Metaq3.0,相比于原始 kafka,其擅长点出了原始的 log collecting 之外,还增加诸如 HA、事务等特性,使得从功能上可以替代传统大部分 MQ。
RocketMQ 具有特点
可靠的 FIFO 和严格的消息顺序
Pub/Sub 和 P2P 消息模型
单队列容纳百万消息的能力
拉(Pull)和推(push)队列
各种消息协议,如 JMS,MQTT 等
分布式集群,支持容错
Docker images for isolated testing and cloud Isolated clusters
丰富的配置和监控功能的管理
RocketMQ 的基本部件
Topic(订阅主题)
Topic 是一个主题。一个系统中,我们可以将消息划成 Topic ,这样,将不同的消息发送到不同的 queue。
Queue(队列)
一个 topic 下,我们可以设置多个 queue,每个 queue 就是我们平时所说的消息队列;
因为 queue 是完全从属于某个特定的 topic 的,所以当我们要发送消息时,总是要指定该消息所属的 topic 是什么。
通过 equeue 就能知道该 topic 下有几个 queue 了,但是到底发送到哪个 queue 呢?比如 topic 下有 4 个 queue,那对于这个 topic 下的消息,发送时,到底该发送到哪个 queue 呢?
消息被路由的过程
目前,equeue 的做法是在发送一个消息时,需要用户指定这个消息对应的 topic 以及一个用来路由的一个 object 类型的参数。
equeue 会根据 topic 得到所有的 queue,然后根据该 object 参数通过 hash code 然后取模 queue 的个数最后得到要发送的 queue 的编号,从而知道该发送到哪个 queue。
这个路由消息的过程是在发送消息的这一方做的,也就是下面要说的 producer。之所以不在消息服务器上做是因为这样可以让用户自己决定该如何路由消息,具有更大的灵活性。
Producer(生产者)
消息队列的生产者。我们知道,消息队列的本质就是实现了 publish-subscribe 的模式,即生产者-消费者模式。生产者生产消息,消费者消费消息。所以这里的 Producer 就是用来生产和发送消息的。
Consumer
消息队列的消费者,一个消息可以有多个消费者。
Consumer Group
消费者分组,这可能对大家来说是一个新概念。之所以要搞出一个消费者分组,是为了实现下面要说的集群消费。一个消费者分组中包含了一些消费者,如果这些消费者是要集群消费,那这些消费者会平均消费该分组中的消息。
Broker
equeue 中的 broker 负责消息的中转,即接收 producer 发送过来的消息,然后持久化消息到磁盘,然后接收 consumer 发送过来的拉取消息的请求,然后根据请求拉取相应的消息给 consumer。
所以,broker 可以理解为消息队列服务器,提供消息的接收、存储、拉取服务。
broker 对于 equeue 来说是核心,它绝对不能挂,一旦挂了,那 producer,consumer 就无法实现 publish-subscribe 了。
NameServer(命名服务)
客户端寻找 NameServer 地址的方式
代码中指定 NameServer 地址;
java 启动参数中指定 NameServer 地址:-Drocketmq.nameerv.addr
环境变量指定 NameServer 地址:NAMESRV_ADDR
HTTP 静态服务器寻址,客户端启动后,会定时访问一个静态 HTTP 服务器,该 URL 返回 NameServer 地址列表。推荐使用 HTTP 静态服务器寻址方式,对于客户端部署简单,而且 NameServer 集群可以热升级。
消息过滤的方式
简单消息过滤。订阅时指定 topic 下面 tags;
高级消息过滤。
Broker 所在的机器会启动多个 FilterServer 过滤进程;
Consumer 启动后,会向 FilterServer 上传一个过滤的 Java 类;
Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 java 过滤程序做过滤,过滤完成后返回给 Consumer。
两种方式的总结:
使用 CPU 资源来换取网卡流量资源;
FilterServer 与 Broker 部署在同一台机器,数据通过本地回环通信,不走网卡;
一台 Broker 部署多个 FilterServer,充分利用 CPU 资源,因为单个 JVM 难以全面利用高配的物理机 CPU 资源;
因为过滤代码使用 Java 编写,应用几乎可以做任意形式的服务器端消息过滤,例如通过 Messgae Header 进行过滤,甚至可以按照 Message Body 进行过滤;
使用 Java 语言进行作为过滤表达式是一个双刃剑,方便了应用的过滤操作,但是带来了服务器端的安全风险。需要应用来保证过滤代码安全,例如在过滤程序中尽可能不做申请大内存,创建线程等操作,避免 Broker 服务器发生资源泄露。
发送消息注意事项
应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用只有设置。只有发送消息设置了 tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤;
每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引,应该可以通过 topic、key 来查询这条消息内容以及消息被谁消费,由于哈希索引,请务必保证 key 尽可能唯一。
消息发送成功或者失败,要打印消息日志,务必输出 sendResult 和 key 字段;
send 消息方法,只要不抛异常,就代表发送成功,但是发送成功会有多个状态,在 sendResult 里定义。
发送信息的结果状态
SEND_OK:消息发送成功;
FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失;
FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到 slave 时超时,消息已经进入服务器队列,只有此次服务器宕机,消息才会丢失;
SLAVE_NOT_AVAILABLE:消息发送成功,但是此时 slave 不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失;
消费信息注意事项
集群消费
集群消费是指,一个 consumer group 下的 consumer,平均消费 topic 下的 queue。
假如一个 topic 下有 4 个 queue,然后当前有一个 consumer group,该分组下有 4 个 consumer,那每个 consumer 就被分配到该 topic 下的一个 queue,这样就达到了平均消费 topic 下的 queue 的目的。
如果 consumer group 下只有两个 consumer,那每个 consumer 就消费 2 个 queue。
如果有 3 个 consumer,则第一个消费 2 个 queue,后面两个每个消费一个 queue,从而达到尽量平均消费。
应该尽量让 consumer group 下的 consumer 的数目和 topic 的 queue 的数目一致或成倍数关系。这样每个 consumer 消费的 queue 的数量总是一样的,这样每个 consumer 服务器的压力才会差不多。当前前提是这个 topic 下的每个 queue 里的消息的数量总是差不多多的。这点我们可以对消息根据某个用户自己定义的 key 来进行 hash 路由来保证。
广播消费
广播消费是指一个 consumer 只要订阅了某个 topic 的消息,那它就会收到该 topic 下的所有 queue 里的消息,而不管这个 consumer 的 group 是什么。所以对于广播消费来说,consumer group 没什么实际意义。consumer 可以在实例化时,我们可以指定是集群消费还是广播消费。
对于集群消费和广播消费,消费进度持久化的地方是不同的,集群消费的消费进度是放在 broker,也就是消息队列服务器上的,而广播消费的消费进度是存储在 consumer 本地磁盘上的。
集群消费目的
由于一个 queue 的消费者可能会更换,因为 consumer group 下的 consumer 数量可能会增加或减少,然后就会重新计算每个 consumer 该消费的 queue 是哪些,所以,当出现一个 queue 的 consumer 变动的时候,新的 consumer 如何知道该从哪里开始消费这个 queue 呢?
如果这个 queue 的消费进度是存储在前一个 consumer 服务器上的,那就很难拿到这个消费进度了,因为有可能那个服务器已经挂了,或者下架了,都有可能。而因为 broker 对于所有的 consumer 总是在服务的,所以,在集群消费的情况下,被订阅的 topic 的 queue 的消费位置是存储在 broker 上的,存储的时候按照不同的 consumer group 做隔离,以确保不同的 consumer group 下的 consumer 的消费进度互补影响。
广播消费目的
广播消费,由于不会出现一个 queue 的 consumer 会变动的情况,所以我们没必要让 broker 来保存消费位置,所以是保存在 consumer 自己的服务器上。
消费进度(offset)
消费进度是指,当一个 consumer group 里的 consumer 在消费某个 queue 里的消息时,equeue 是通过记录消费位置(offset)来知道当前消费到哪里了。以便该 consumer 重启后继续从该位置开始消费。
比如一个 topic 有 4 个 queue,一个 consumer group 有 4 个 consumer,则每个 consumer 分配到一个 queue,然后每个 consumer 分别消费自己的 queue 里的消息。
equeue 会分别记录每个 consumer 对其 queue 的消费进度,从而保证每个 consumer 重启后知道下次从哪里开始继续消费。
实际上,也许下次重启后不是由该 consumer 消费该 queue 了,而是由 group 里的其他 consumer 消费了,这样也没关系,因为我们已经记录了这个 queue 的消费位置了。
消费位置和 consumer 其实无关,消费位置完全是 queue 的一个属性,用来记录当前被消费到哪里了。另外一点很重要的是,一个 topic 可以被多个 consumer group 里的 consumer 订阅。
不同 consumer group 里的 consumer 即便是消费同一个 topic 下的同一个 queue,那消费进度也是分开存储的。也就是说,不同的 consumer group 内的 consumer 的消费完全隔离,彼此不受影响。
版权声明: 本文为 InfoQ 作者【浩宇天尚】的原创文章。
原文链接:【http://xie.infoq.cn/article/8f3d09321baacba3568c359a6】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
评论