写点什么

看这篇就够了!RabbitMQ 如何防止数据丢失,详细讲解,linux 教程课后答案孟庆昌

用户头像
极客good
关注
发布于: 刚刚

logger.info("------------> end <------------");


}


@SuppressWarnings("unchecked")


private <T> T byteToObject(byte[] bytes, Class<T> clazz) {


T t;


try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);


ObjectInputStream ois = new ObjectInputStream(bis)) {


t = (T) ois.readObject();


} catch (Exception e) {


e.printStackTrace();


return null;


}


return t;


}


}


复制代码


我这里就简单地打印回调方法返回的消息,在实际项目中,可以把返回的消息存储到日志表中,使用定时任务进行进一步的处理。


我这里是使用 RabbitTemplate 进行发送,所以在 Service 层的 RabbitTemplate 需要设置一下:


@Service


public class RabbitMQServiceImpl implements RabbitMQService {


@Resource


private RabbitmqConfirmCallback rabbitmqConfirmCallback;


@Resource


private RabbitTemplate rabbitTemplate;


@PostConstruct


public void init() {


//指定 ConfirmCallback


rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);


//指定 ReturnCallback


rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);


}


@Override


public String sendMsg(String msg) throws Exception {


Map<String, Object> message = getMessage(msg);


try {


CorrelationData correlationData = (CorrelationData) message.remove("correlationData");


rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);


return "ok";


} catch (Exception e) {


e.printStackTrace();


return "error";


}


}


private Map<String, Object> getMessage(String msg) {


String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);


CorrelationData correlationData = new CorrelationData(msgId);


String sendTime = sdf.format(new Date());


Map<String, Object> map = new HashMap<>();


map.put("msgId", msgId);


map.put("sendTime", sendTime);


map.put("msg", msg);


map.put("correlationData", correlationData);


return map;


}


}


复制代码


大功告成!接下来我们进行测试,发送一条消息,我们可以控制台:



假设发送一条信息没有路由匹配到队列,可以看到如下信息:



这就是 confirm 模式。它的作用是为了保障生产者投递消息到 RabbitMQ 不会出现消息丢失


3.2 事务机制(ACK)


=============


最开始的那张图已经讲过,消费者从队列中获取到消息后,会直接确认签收,假设消费者宕机或者程序出现异常,数据没有正常消费,这种情况就会出现数据丢失


所以关键在于把自动签收改成手动签收,正常消费则返回确认签收,如果出现异常,则返回拒绝签收重回队列。



代码怎么实现呢,请看演示:


首先在消费者的 application.yml 文件中设置事务提交为 manual 手动模式:


spring:


rabb


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


itmq:


listener:


simple:


acknowledge-mode: manual # 手动 ack 模式


concurrency: 1 # 最少消费者数量


max-concurrency: 10 # 最大消费者数量


复制代码


然后编写消费者的监听器:


@Component


public class RabbitDemoConsumer {


enum Action {


//处理成功


SUCCESS,


//可以重试的错误,消息重回队列


RETRY,


//无需重试的错误,拒绝消息,并从队列中删除


REJECT


}


@RabbitHandler


@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))


public void process(String msg, Message message, Channel channel) {


long tag = message.getMessageProperties().getDeliveryTag();


Action action = Action.SUCCESS;


try {


System.out.println("消费者 RabbitDemoConsumer 从 RabbitMQ 服务端消费消息:" + msg);


if ("bad".equals(msg)) {


throw new IllegalArgumentException("测试:抛出可重回队列的异常");


}


if ("error".equals(msg)) {


throw new Exception("测试:抛出无需重回队列的异常");


}


} catch (IllegalArgumentException e1) {


e1.printStackTrace();


//根据异常的类型判断,设置 action 是可重试的,还是无需重试的


action = Action.RETRY;


} catch (Exception e2) {


//打印异常


e2.printStackTrace();


//根据异常的类型判断,设置 action 是可重试的,还是无需重试的


action = Action.REJECT;


} finally {


try {


if (action == Action.SUCCESS) {


//multiple 表示是否批量处理。true 表示批量 ack 处理小于 tag 的所有消息。false 则处理当前消息


channel.basicAck(tag, false);


} else if (action == Action.RETRY) {


//Nack,拒绝策略,消息重回队列


channel.basicNack(tag, false, true);


} else {


//Nack,拒绝策略,并且从队列中删除


channel.basicNack(tag, false, false);


}


channel.close();


} catch (Exception e) {


e.printStackTrace();


}


}


}


}


复制代码


解释一下上面的代码,如果没有异常,则手动确认回复 RabbitMQ 服务端 basicAck(消费成功)。


如果抛出某些可以重回队列的异常,我们就回复 basicNack 并且设置重回队列。


如果是抛出不可重回队列的异常,就回复 basicNack 并且设置从 RabbitMQ 的队列中删除。


接下来进行测试,发送一条普通的消息"hello":



解释一下 ack 返回的三个方法的意思。


①成功确认


void basicAck(long deliveryTag, boolean multiple) throws IOException;


复制代码


消费者成功处理后调用此方法对消息进行确认。


  • deliveryTag:该消息的 index

  • multiple:是否批量.。true:将一次性 ack 所有小于 deliveryTag 的消息。


②失败确认


void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;


复制代码


  • deliveryTag:该消息的 index。

  • multiple:是否批量。true:将一次性拒绝所有小于 deliveryTag 的消息。

  • requeue:被拒绝的是否重新入队列。


③失败确认


void basicReject(long deliveryTag, boolean requeue) throws IOException;


复制代码


  • deliveryTag:该消息的 index。

  • requeue:被拒绝的是否重新入队列。


basicNack()和 basicReject()的区别在于:basicNack()可以批量拒绝,basicReject()一次只能拒接一条消息


四、遇到的坑


======


4.1 启用 nack 机制后,导致的死循环


====================


上面的代码我故意写了一个 bug。测试发送一条"bad",然后会抛出重回队列的异常。这就有个问题:重回队列后消费者又消费,消费抛出异常又重回队列,就造成了死循环。



那怎么避免这种情况呢?


既然 nack 会造成死循环的话,我提供的一个思路是不使用 basicNack(),把抛出异常的消息落库到一张表中,记录抛出的异常,消息体,消息 Id。通过定时任务去处理


如果你有什么好的解决方案,也可以留言讨论~


4.2 double ack


==============


有的时候比较粗心,不小心开启了自动 Ack 模式,又手动回复了 Ack。那就会报这个错误:


消费者 RabbitDemoConsumer 从 RabbitMQ 服务端消费消息:java 技术爱好者


2020-08-02 22:52:42.148 ERROR 4880 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)


2020-08-02 22:52:43.102 INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,5), conn: Proxy@782a1679 Shared Rabbit Connection: SimpleConnection@67c5b175 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0


复制代码


出现这个错误,可以检查一下 yml 文件是否添加了以下配置:


spring:


rabbitmq:


listener:


simple:


acknowledge-mode: manual


concurrency: 1


max-concurrency: 10


复制代码


如果上面这个配置已经添加了,还是报错,**有可能你使用 @Configuration 配置了


SimpleRabbitListenerContainerFactory,根据 SpringBoot 的特性,代码优于配置,代码的配置覆盖了 yml 的配置,并且忘记设置手动 manual 模式**:


@Bean


public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {


SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();


factory.setConnectionFactory(connectionFactory);


//设置手动 ack 模式

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
看这篇就够了!RabbitMQ如何防止数据丢失,详细讲解,linux教程课后答案孟庆昌