看这篇就够了!RabbitMQ 如何防止数据丢失,详细讲解,linux 教程课后答案孟庆昌
logger.info("------------> end <------------");
}
@SuppressWarnings("unchecked")
private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
T t;
try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bis)) {
t = (T) ois.readObject();
} catch (Exception e) {
e.printStackTrace();
return null;
}
return t;
}
}
复制代码
我这里就简单地打印回调方法返回的消息,在实际项目中,可以把返回的消息存储到日志表中,使用定时任务进行进一步的处理。
我这里是使用 RabbitTemplate 进行发送,所以在 Service 层的 RabbitTemplate 需要设置一下:
@Service
public class RabbitMQServiceImpl implements RabbitMQService {
@Resource
private RabbitmqConfirmCallback rabbitmqConfirmCallback;
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
}
@Override
public String sendMsg(String msg) throws Exception {
Map<String, Object> message = getMessage(msg);
try {
CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
private Map<String, Object> getMessage(String msg) {
String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
CorrelationData correlationData = new CorrelationData(msgId);
String sendTime = sdf.format(new Date());
Map<String, Object> map = new HashMap<>();
map.put("msgId", msgId);
map.put("sendTime", sendTime);
map.put("msg", msg);
map.put("correlationData", correlationData);
return map;
}
}
复制代码
大功告成!接下来我们进行测试,发送一条消息,我们可以控制台:
假设发送一条信息没有路由匹配到队列,可以看到如下信息:
这就是 confirm 模式。它的作用是为了保障生产者投递消息到 RabbitMQ 不会出现消息丢失。
3.2 事务机制(ACK)
=============
最开始的那张图已经讲过,消费者从队列中获取到消息后,会直接确认签收,假设消费者宕机或者程序出现异常,数据没有正常消费,这种情况就会出现数据丢失。
所以关键在于把自动签收改成手动签收,正常消费则返回确认签收,如果出现异常,则返回拒绝签收重回队列。
代码怎么实现呢,请看演示:
首先在消费者的 application.yml 文件中设置事务提交为 manual 手动模式:
spring:
rabb
itmq:
listener:
simple:
acknowledge-mode: manual # 手动 ack 模式
concurrency: 1 # 最少消费者数量
max-concurrency: 10 # 最大消费者数量
复制代码
然后编写消费者的监听器:
@Component
public class RabbitDemoConsumer {
enum Action {
//处理成功
SUCCESS,
//可以重试的错误,消息重回队列
RETRY,
//无需重试的错误,拒绝消息,并从队列中删除
REJECT
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public void process(String msg, Message message, Channel channel) {
long tag = message.getMessageProperties().getDeliveryTag();
Action action = Action.SUCCESS;
try {
System.out.println("消费者 RabbitDemoConsumer 从 RabbitMQ 服务端消费消息:" + msg);
if ("bad".equals(msg)) {
throw new IllegalArgumentException("测试:抛出可重回队列的异常");
}
if ("error".equals(msg)) {
throw new Exception("测试:抛出无需重回队列的异常");
}
} catch (IllegalArgumentException e1) {
e1.printStackTrace();
//根据异常的类型判断,设置 action 是可重试的,还是无需重试的
action = Action.RETRY;
} catch (Exception e2) {
//打印异常
e2.printStackTrace();
//根据异常的类型判断,设置 action 是可重试的,还是无需重试的
action = Action.REJECT;
} finally {
try {
if (action == Action.SUCCESS) {
//multiple 表示是否批量处理。true 表示批量 ack 处理小于 tag 的所有消息。false 则处理当前消息
channel.basicAck(tag, false);
} else if (action == Action.RETRY) {
//Nack,拒绝策略,消息重回队列
channel.basicNack(tag, false, true);
} else {
//Nack,拒绝策略,并且从队列中删除
channel.basicNack(tag, false, false);
}
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
复制代码
解释一下上面的代码,如果没有异常,则手动确认回复 RabbitMQ 服务端 basicAck(消费成功)。
如果抛出某些可以重回队列的异常,我们就回复 basicNack 并且设置重回队列。
如果是抛出不可重回队列的异常,就回复 basicNack 并且设置从 RabbitMQ 的队列中删除。
接下来进行测试,发送一条普通的消息"hello":
解释一下 ack 返回的三个方法的意思。
①成功确认
void basicAck(long deliveryTag, boolean multiple) throws IOException;
复制代码
消费者成功处理后调用此方法对消息进行确认。
deliveryTag:该消息的 index
multiple:是否批量.。true:将一次性 ack 所有小于 deliveryTag 的消息。
②失败确认
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
复制代码
deliveryTag:该消息的 index。
multiple:是否批量。true:将一次性拒绝所有小于 deliveryTag 的消息。
requeue:被拒绝的是否重新入队列。
③失败确认
void basicReject(long deliveryTag, boolean requeue) throws IOException;
复制代码
deliveryTag:该消息的 index。
requeue:被拒绝的是否重新入队列。
basicNack()和 basicReject()的区别在于:basicNack()可以批量拒绝,basicReject()一次只能拒接一条消息。
四、遇到的坑
======
4.1 启用 nack 机制后,导致的死循环
====================
上面的代码我故意写了一个 bug。测试发送一条"bad",然后会抛出重回队列的异常。这就有个问题:重回队列后消费者又消费,消费抛出异常又重回队列,就造成了死循环。
那怎么避免这种情况呢?
既然 nack 会造成死循环的话,我提供的一个思路是不使用 basicNack(),把抛出异常的消息落库到一张表中,记录抛出的异常,消息体,消息 Id。通过定时任务去处理。
如果你有什么好的解决方案,也可以留言讨论~
4.2 double ack
==============
有的时候比较粗心,不小心开启了自动 Ack 模式,又手动回复了 Ack。那就会报这个错误:
消费者 RabbitDemoConsumer 从 RabbitMQ 服务端消费消息:java 技术爱好者
2020-08-02 22:52:42.148 ERROR 4880 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2020-08-02 22:52:43.102 INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,5), conn: Proxy@782a1679 Shared Rabbit Connection: SimpleConnection@67c5b175 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 56938], acknowledgeMode=AUTO local queue size=0
复制代码
出现这个错误,可以检查一下 yml 文件是否添加了以下配置:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
concurrency: 1
max-concurrency: 10
复制代码
如果上面这个配置已经添加了,还是报错,**有可能你使用 @Configuration 配置了
SimpleRabbitListenerContainerFactory,根据 SpringBoot 的特性,代码优于配置,代码的配置覆盖了 yml 的配置,并且忘记设置手动 manual 模式**:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置手动 ack 模式
评论