在使用 Pulsar 的过程中,我们常常会对 Nack 的消息进行重试,但是不断地重试也并不是好办法,我们需要让重试过多的消息进入到另一个队列中调用另外的逻辑去处理。这时可以使用 Pulsar 提供的死信队列(Dead Letter Topic),Pulsar 会将重试超过一定次数的消息移到 DLQ 中,让用户能够通过消费 DLQ 对其中的消息进行处理。
RedeliveryTracker
实现 DLQ 首先需要追踪消息重试的次数,在 Broker 端会有一个 RedeliveryTracker 来记录重试次数。如当 Message 被重新分发时,则会利用 RedeliveryTracker 记录某个消息的次数,使其自增一:
@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
positions.forEach(redeliveryTracker::incrementAndGetRedeliveryCount);
redeliverUnacknowledgedMessages(consumer);
}
复制代码
在每次将消息分发给 Consumer 时,会将 redeliveryCount 重试次数发送给 Client 的 Consumer:
int redeliveryCount = 0;
PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
if (redeliveryTracker.contains(position)) {
redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
}
ctx.write(
cnx.newMessageAndIntercept(consumerId, entry.getLedgerId(), entry.getEntryId(), partitionIdx,
redeliveryCount, metadataAndPayload,
batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName),
ctx.voidPromise());
复制代码
当消息被 ack 之后,则会将消息的位置从 RedeliveryTracker 中移除。
在 Broker 中,RedeliveryTracker 目前只有内存实现,也就是维护的重试次数是储存在 Broker 的内存中,如果 Broker 重启,那么正在追踪的重试次数将会丢失。
DeadLetterProducer
在 Consumer 中,则判断 Broker 传递过来的重试次数,如果超过了用户所设置的最大重试次数,那么会创建 DeadLetterProducer,将消息写入到 DLQ 中。
deadLetterProducer.thenAcceptAsync(producerDLQ -> {
for (MessageImpl<T> message : finalDeadLetterMessages) {
String originMessageIdStr = getOriginMessageIdStr(message);
String originTopicNameStr = getOriginTopicNameStr(message);
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr))
.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {
if (ex != null) {
log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original topic but send to the DLQ successfully.",
topicName, subscription, consumerName, finalMessageId, ex);
} else {
result.complete(true);
}
});
}).exceptionally(ex -> {
log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}",
topicName, subscription, consumerName, finalMessageId, ex);
result.complete(false);
return null;
});
}
}).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex);
deadLetterProducer = null;
result.complete(false);
return null;
});
复制代码
在消息写入成功后,会在 consumer 的 topic 中对原来的消息进行 ack,这样在原先的 topic 中就不会再次消费到该消息了,用户可在 DLQ 中继续进行消费。
但目前的 DLQ 实现中,如果用户没有实现在 DLQ 上创建订阅,可能导致 DeadLetterProducer 写入数据后因为没有订阅又被 Broker 删除,导致数据丢失,这个在实际应用中需要注意,需要提前创建好订阅。Pulsar 有望在后续的版本中修复这一问题。
评论