springBoot 集成 rabbitmq 并实现延时队列

集成rabbitmq
前言
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题
实现高性能,高可用,可伸缩和最终一致性架构。RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息,具有较高的系统吞吐量、可靠性、消息持久化、免费等优点,在软件项目中具有非常广泛的应用。
项目介绍
本项目以springboot集成rabbitmq,引导如何设计和优雅地集成rabbitmq相关的组件,并实现用死信队列实现延迟消息队列。
项目设计与实战
配置
maven依赖
 <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.0.1.RELEASE</version>        <relativePath/>  </parent><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-amqp</artifactId></dependency>
配置文件
spring.rabbitmq.host=192.168.202.128spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest
组件设计与实现
Exchange(交换机)
定义交换机名称、类型、持久化、延时交换机名称等属性。
public interface IRabbitMqExchange {    /**     * Exchange(交换机) 的名称     * */    String exchangeName();    /**     * exchange类型 DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers")     * */    default String type(){return "topic";}    /**     * 是否持久化     */    default boolean durable(){return true;}    /**     * 当所有队列在完成使用此exchange时,是否删除     */    default boolean autoDelete(){return false;}    /**     * 是否允许直接binding     * 如果是true的话 则不允许直接binding到此 exchange     */    default boolean internal(){ return false;}    /**     * 其他的一些参数设置     */    default Map<String, Object> arguments(){ return null; }    /**     * 延时 Exchange     * */    default String delayExchangeName() {return "delay."+exchangeName();}}
路由(Routing)
public interface IRabbitMqRouting {    /**     * rabbitmq路由key     * */    String routingKey();}
队列(Queue)
定义队列名称、持久化、延时队列名称等属性
public interface IRabbitMqQueue {    /**     * Queue(队列)名称     */    String queueName();    /**     * 是否持久化     * */    default boolean durable() {return true;}    /**     * 排他性     * */    default boolean exclusive(){return false;}    /**     * 是否自动删除     * */    default boolean autoDelete(){return false;}    /**     * 其他属性设置     * */    default Map<String, Object> arguments() { return null; }    /**     * 默认的延时队列名称     * */    default String delayQueueName(){return "delay."+this.queueName();}}
绑定关系(Binding)
定义了 交换机(Exchange)-路由(Routing)-消息队列(Queue)的绑定关系,以及定义是否支持延时消息。
public interface IRabbitMqBinding {    /**     * 需要绑定的exchange(交换机)     * */    IRabbitMqExchange exchange();    /**     * 需要绑定的routing(路由)     * */    IRabbitMqRouting routing();    /**     * 需要绑定的queue(队列)     * */    IRabbitMqQueue queue();    /**     * 消息队列是否允许延时     * */    boolean allowDelay();}
默认注册器
实现了交换机、消息队列、绑定关系的注册。如果绑定关系中定义支持延迟消息,则额外注册一个延时交换机和死信队列,以实现延时消息推送的功能。
public class DefaultRabbitMqRegister implements IRabbitMqRegister, SmartLifecycle {    ConnectionFactory connectionFactory;    Channel channel;    public DefaultRabbitMqRegister() {    }    public DefaultRabbitMqRegister(ConnectionFactory connectionFactory) {        this.connectionFactory = connectionFactory;    }    @PostConstruct    public void init() {        channel = connectionFactory.createConnection().createChannel(false);    }    @Override    public void registerExchange(IRabbitMqExchange... exchanges) throws IOException {        for (IRabbitMqExchange exchange : exchanges) {            channel.exchangeDeclare(exchange.exchangeName(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.internal(), exchange.arguments());        }    }    @Override    public void registerQueue(IRabbitMqQueue... queues) throws IOException {        for (IRabbitMqQueue queue : queues) {            channel.queueDeclare(queue.queueName(), queue.durable(), queue.exclusive(), queue.autoDelete(), queue.arguments());        }    }    @Override    public void registerBinding(IRabbitMqBinding... bindings) throws IOException {        for (IRabbitMqBinding binding : bindings) {            channel.queueBind(binding.queue().queueName(), binding.exchange().exchangeName(), binding.routing().routingKey());            if (binding.allowDelay()) {                registerDelayBinding(binding);            }        }    }    /**     * 创建一个内部的 死信队列 用来实现 延时队列     */    private void registerDelayBinding(IRabbitMqBinding binding) throws IOException {        IRabbitMqExchange exchange = binding.exchange();        // 注册一个延时的消息交换机        channel.exchangeDeclare(exchange.delayExchangeName(), exchange.type(), exchange.durable(), exchange.autoDelete(), exchange.internal(), exchange.arguments());        // 注册一个死信队列  设置消息超时后,将消息转发到原来的Router队列        IRabbitMqQueue queue = binding.queue();        Map<String, Object> arguments = queue.arguments();        if (arguments == null) {            arguments = new HashMap<>(4);        }        arguments.put("x-dead-letter-exchange", binding.exchange().exchangeName());        arguments.put("x-dead-letter-routing-key", binding.routing().routingKey());        channel.queueDeclare(queue.delayQueueName(), queue.durable(), queue.exclusive(), queue.autoDelete(), arguments);        // 将交换机和队列绑定        channel.queueBind(queue.delayQueueName(), exchange.delayExchangeName(), binding.routing().routingKey());    }    private List<MessageListenerContainer> listenerContainers = new LinkedList<>();    @Override    public void listenerQueue(IRabbitMqListener listener, IRabbitMqQueue... queues) {        String[] queueNames = new String[queues.length];        for (int idx = 0; idx < queues.length; idx++) {            queueNames[idx] = queues[idx].queueName();        }        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);        // 配置手动确认        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);        container.setQueueNames(queueNames);        container.setMessageListener(listener);        listenerContainers.add(container);    }    @Override    public void start() {        for (MessageListenerContainer container : listenerContainers) {            container.start();        }    }    @Override    public void stop() {    }    @Override    public boolean isRunning() {        return false;    }    @Override    public boolean isAutoStartup() {        return true;    }    @Override    public void stop(Runnable runnable) {    }    @Override    public int getPhase() {        return 9999;    }}
消息监听器
public interface IRabbitMqListener {    /**     * 处理rabbitMq的消息     * */    boolean handleMessage(Object obj);}
抽象实现类(具体的消费者继承该抽象类,重写handleMessage()方法,实现消费逻辑)
public abstract class AbstractMessageListener implements ChannelAwareMessageListener, IRabbitMqListener {    private Logger logger = LoggerFactory.getLogger(AbstractMessageListener.class);    private MessageConverter messageConverter = new Jackson2JsonMessageConverter();    @Override    public void onMessage(Message message, Channel channel) throws Exception {        long tag = message.getMessageProperties().getDeliveryTag();        try {            Object obj = messageConverter.fromMessage(message);            boolean handleResult = handleMessage(obj);            if (handleResult) {                channel.basicAck(tag, false);            } else {                logger.error("消息处理失败 message: {}", message);                channel.basicNack(tag, false, false);            }        } catch (Exception e) {            channel.basicNack(tag, false, false);            logger.error("消息处理异常 message: " + message + " " + e.getMessage(), e);        }    }}
消息发送服务类
实现发送消息、发送延时消息等功能
public class RabbitMqServiceImpl implements IRabbitMqService, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {    private Logger logger = LoggerFactory.getLogger(RabbitMqServiceImpl.class);    @Autowired    protected RabbitTemplate rabbitTemplate;    @PostConstruct    public void init() {        rabbitTemplate.setConfirmCallback(this);        rabbitTemplate.setReturnCallback(this);        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());    }    @Override    public void send(IRabbitMqExchange exchange, IRabbitMqRouting routing, Object msg) {        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());        rabbitTemplate.convertAndSend(exchange.exchangeName(), routing.routingKey(), msg, correlationId);    }    @Override    public void send(IRabbitMqExchange exchange, IRabbitMqRouting routing, Object msg, long delay) {        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());        if (delay > 0) {            MessagePostProcessor processor = (Message message) -> {                message.getMessageProperties().setExpiration(delay + "");                return message;            };            rabbitTemplate.convertAndSend(exchange.delayExchangeName(), routing.routingKey(), msg, processor, correlationId);        } else {            rabbitTemplate.convertAndSend(exchange.exchangeName(), routing.routingKey(), msg, correlationId);        }    }    /**     * 消息发送的回调     *     * @param correlationId 消息Id     * @param ack           是否成功的标示     * @param cause         错误原因     */    @Override    public void confirm(CorrelationData correlationId, boolean ack, String cause) {        if (ack) {            logger.info("消息发送成功 correlationId: {} cause: {}", correlationId, cause);        } else {            logger.error("消息发送失败 correlationId: {} cause: {}", correlationId, cause);        }    }    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {        logger.info("returnedMessage message: {} replyCode: {} exchange: {} routingKey: {}", message, replyCode, exchange, routingKey);    }}
实战
使用枚举定义消息队列配置
定义测试Exchange:mq.exchange.test
/** * RabbitMq Exchange(交换机)定义 * */public enum RabbitMqExchange implements IRabbitMqExchange {    MQ_EXCHANGE_TEST("mq.exchange.test") ;    private String exchangeName;    @Override    public String exchangeName() {        return this.exchangeName;    }    RabbitMqExchange(String exchangeName){        this.exchangeName = exchangeName;    }}
定义测试Queue:mq.queue.test
public enum RabbitMqQueue implements IRabbitMqQueue {    MQ_QUEUE_TEST("mq.queue.test");    private String queueName;    @Override    public String queueName() {        return this.queueName;    }    RabbitMqQueue(String queueName){        this.queueName = queueName;    }}
定义测试Routing:mq.routing.test
/** * RabbitMq routing(路由定义) * */public enum RabbitMqRouting implements IRabbitMqRouting {    MQ_ROUTING_TEST("mq.routing.test");    private String routingKey;    @Override    public String routingKey() {        return this.routingKey;    }    RabbitMqRouting(String routingKey){        this.routingKey = routingKey;    }}
定义绑定关系:
/** * RabbitMq Exchange(交换机) Routing(路由) Queue(队列) 的绑定关系 * */public enum RabbitMqBinding implements IRabbitMqBinding {    MQ_BINDING_TEST(RabbitMqExchange.MQ_EXCHANGE_TEST,RabbitMqRouting.MQ_ROUTING_TEST,RabbitMqQueue.MQ_QUEUE_TEST,true);    /**     * exchange(交换机)     */    IRabbitMqExchange exchange;    /**     * routing(路由)     */    IRabbitMqRouting routing;    /**     * queue(队列)     */    IRabbitMqQueue queue;    /**     * 是否允许延时     */    boolean allowDelay = false;    RabbitMqBinding(IRabbitMqExchange exchange,IRabbitMqRouting routing,IRabbitMqQueue queue){        this.exchange = exchange;        this.routing = routing;        this.queue = queue;    }    RabbitMqBinding(IRabbitMqExchange exchange,IRabbitMqRouting routing,IRabbitMqQueue queue,boolean allowDelay){        this.exchange = exchange;        this.routing = routing;        this.queue = queue;        this.allowDelay = allowDelay;    }    @Override    public IRabbitMqExchange exchange() {        return this.exchange;    }    @Override    public IRabbitMqRouting routing() {        return this.routing;    }    @Override    public IRabbitMqQueue queue() {        return this.queue;    }    @Override    public boolean allowDelay() {        return this.allowDelay;    }}
测试消费者类
public class TestConsumer extends AbstractMessageListener {    Logger logger = LoggerFactory.getLogger(TestConsumer.class);    @Override    public boolean handleMessage(Object obj) {        logger.info("rabbitmq消费者开始消费,消息内容:" +obj.toString());        return true;    }}
启动项目

登录rabbitmq控制台,已经自动创建了 交换机和延迟交换机,消息队列和死信队列


测试发送消息
@Testpublic void testSendMq(){     logger.info("生产者发送消息到mq");     rabbitMqService.send(RabbitMqExchange.MQ_EXCHANGE_TEST, RabbitMqRouting.MQ_ROUTING_TEST,"测试发送消息"); }


测试发送延时消息(60秒)
 @Testpublic void testSendDelayMq(){    logger.info("生产者发送延迟消息到mq");     rabbitMqService.send(RabbitMqExchange.MQ_EXCHANGE_TEST, RabbitMqRouting.MQ_ROUTING_TEST,"测试发送延时消息60s",60*1000); }


代码获取
这个项目包含很多轮子实例,跪求star
https://github.com/pengziliu/GitHub-code-practice
版权声明: 本文为 InfoQ 作者【生命在于折腾】的原创文章。
原文链接:【http://xie.infoq.cn/article/b434dee0c221d14055a07bb03】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
 
 生命在于折腾
还未添加个人签名 2018.04.25 加入
还未添加个人简介
 
  
  
  
  
  
  
  
  
    
评论