我就不服了,看完这篇文章,5 大常见消息队列开发你还学不会
前言
小伙伴们,国庆都过的开心吗?国庆后的第一个工作日是不是很多小伙伴还沉浸在假期的心情中,没有工作状态呢?
那小Q今天和大家聊一聊消息中间件吧,不说废话直接开始
相信只要做过开发的朋友基本都是知道,一般来说,大型应用通常会被拆分成多个子系统,这些子系统可能会部署在多台机器上,这样的应用就是分布式应用。而分布式应用的子系统之间并不是完全独立的,它们需要相互通信来共同完成某个功能,这就涉及系统间通信了。
目前,业界通常有两种方式来实现系统间通信
一种是基于远程过程调用的方式;
一种是基于消息队列的方式。
前一种就是我们常说的RPC调用,客户端不需要知道调用的具体实现细节,只需直接调用实际存在于远程计算机上的某个对象即可,但调用方式看起来和调用本地应用程序中的对象一样
但是这个不是重点,重点是第二种方式,也是现在在是市面上被大多数程序员所接受的方式,大多数的公司也在使用这种开发模式——基于消息队列的方式
基于消息队列的方式是指由应用中的某个系统负责发送信息,由关心这条消息的相应系统负责接收消息,并在收到消息后进行各自系统内的业务处理。
消息在被发送后可以立即返回,由消息队列来负责消息的传递,消息发布者只管将消息发布到消息队列而不用管谁来取,消息使用者只管从消息队列中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在
那这样的话,消息队列是如何消费的呢?因为正好处于学习(绝对不是公司老大的“严厉逼迫”)的情况下,我自己实现了一些RocketMQ相关的内容,正好有代码,我就以RocketMQ为例,给大家解释一下吧
消费者组
在正式开始说消费之前,我们首先要明白一个概念,就是消费组
消费者组(Consumer Group)是一类消费者的集合,这类消费者通常消费同一类消息并且消费逻辑一致,所以将这些消费者分组在一起。消费者组与生产者组类似,都是将相同角色的消费者分组在一起并命名的。
分组是一个很精妙的概念设计,RocketMQ正是通过这种分组机制,实现了天然的消息负载均衡。在消费消息时,通过消费者组实现了将消息分发到多个消费者服务器实例
设置消费者的名字是在代码中实现的,如下:
举个例子
某个主题有9条消息,其中一个消费者组有3个实例(3个进程或3台机器),那么每个实例将均摊3条消息,这也意味着我们可以很方便地通过增加机器来实现水平扩展。
如果还不理解的话,我们看下面这张图,由订单系统来的消息,被库存和积分两个组所分配,每个组就是一个消费组
集群模式和广播模式
那同样的,这样也就会产生一个很寻常的问题,对于消费者组而言,当消息传来时,假设有多台服务器,那谁来消费这个消息
这其实是消费的两种模式,集群模式和广播模式。
消息消费模式有两种:集群消费(Clustering)和广播消费(Broadcasting)。
默认是集群消费,在该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉了,分组内的其他消费者会接替挂掉的消费者继续消费。
广播消费会将消息发给消费者组中的每一个消费者进行消费。
当然我们可以手动指定为广播模式。
Push模式和Pull模式
好了,前面铺垫的基础内容结束了,接下来我们就是整体,消费者如何消费生产者的消息
消费者(Consumer)负责消费消息,它从消息服务器拉取消息并将其输入用户应用程序中。
从用户应用的角度来看,消费者有两种类型:拉取型消费者和推送型消费者。
拉取型消费者
拉取型消费者(Pull Consumer)主动从消息服务器拉取消息,只要批量拉取到消息,用户应用就会启动消费过程,所以Pull被称为主动消费类型。
推送型消费者
推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以Push被称为被动消费类型。但从实现上看,还是从消息服务器拉取消息的。
不同于Pull 可是,Push首先要注册消费监听器,当监听器被触发后才开始消费消息。
代码实现
篇幅原因,这里我只展示消费者方面的代码,获取完整代码,在文章最后有获取方式
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrently.Context;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrently.status;
import org.apache.rocketmq.client.consumer.listener.MessageListener.Concurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
public class TestRockerMq {
public static void main(String[] args) throws Exception {
//创建一个消息消费者,并设置一个消息消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "niwei_consumer_group");
//指定Nameserver地址
consumer.setNamesrvAddr("localhost:9876");
//设置 consumer第一次启动时是从队列头部还是队列尾部开始消费的
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFESET);
//订闷指定Topic 下的所有消息
consumer.subscribe("topic_example_java","*");
//注册消息监听器
consumer.registerMessageListener ((List<MessageExt> list,Consumeconcurrentlycontext context)->{
//默认list里只有一条消息,可以通过设置参数来批量接收消息
if(list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printstackTrace();
}
}
}
return ConsumeConcurrentlystatus.CONSUME_SUCCESs;
});
//消费者对象在使用之前必须要调用start方法初始化
consumer.start();
System.out.println("消息消费者已启动");
}
}
好了,关于RocketMQ就到这里吧,毕竟我也刚上班,实在是找不到什么状态,要是能直接放链接,我就把我的Git地址直接放这里了,idea我都真的是一点都不想打开啊
不过只有这点东西怎么可以,我也于心不忍啊,来吧,给大家分享一份文档,内容涵盖消息队列、消息协议、RabbitMQ、ActiveMQ、Kafka、RocketMQ,来详细的看一下
基础概念
第1章消息队列
第2章消息协议
源码解析
第3章RabbitMQ
第4章ActiveMQ
第5章Kafka
第6章RocketMQ
其实网上各种理论的东西挺多的了,关于理论,只要你想找,那某度一搜一大堆,可是,真的将源码分享出来的,说实话,不多,而这份文档中的源码都是可以实现的,实现后可以自己留存或者跟我一样,直接同步到Git上,因为公司中的项目指不定会有什么神奇的技术选型,提前准备好到时候就可以直接使用,有的时候能帮你大忙
好啦,关注我,一个脑回路清奇的程序员一枚,个人公众号:Java架构师联盟,每日更新技术好文
版权声明: 本文为 InfoQ 作者【小Q】的原创文章。
原文链接:【http://xie.infoq.cn/article/cc2526e0b520f94d8c0dee2e8】。
本文遵守【CC BY-NC-ND】协议,转载请保留原文出处及本版权声明。
评论