写点什么

Spring Cloud Stream 消息发送

作者:周杰伦本人
  • 2022 年 8 月 08 日
  • 本文字数:7075 字

    阅读完需:约 23 分钟

Spring Cloud Stream 消息发送


业务发送消息

source.output().send(message);来发送消息


public interface Source {
/** * Name of the output channel. */ String OUTPUT = "output";
/** * @return output channel */ @Output(Source.OUTPUT) MessageChannel output();
}
复制代码


@FunctionalInterfacepublic interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;

default boolean send(Message<?> message) { return send(message, INDEFINITE_TIMEOUT); }
boolean send(Message<?> message, long timeout);
}
复制代码


AbstractMessageChannel 是消息通道的基本实现,提供发送消息和接收消息的公共方法。

AbstractSubscribableChannel 类的 doSend()方法

消息发送到 AbstractSubscribableChannel 类的 doSend()方法如下:


public abstract class AbstractSubscribableChannel extends AbstractMessageChannel    implements SubscribableChannel, SubscribableChannelManagement {
@Override protected boolean doSend(Message<?> message, long timeout) { try { return getRequiredDispatcher().dispatch(message); } catch (MessageDispatchingException e) { String description = e.getMessage() + " for channel '" + this.getFullChannelName() + "'."; throw new MessageDeliveryException(message, description, e); } }
private MessageDispatcher getRequiredDispatcher() { MessageDispatcher dispatcher = getDispatcher(); Assert.state(dispatcher != null, "'dispatcher' must not be null"); return dispatcher; }
protected abstract MessageDispatcher getDispatcher();
}
复制代码


调用 getDispatcher 方法从 DirectChannel 中得到消息分发类 MessageDispatcher 的实现类 UnicastingDispatcher,调用 dispatch 方法把消息分发给各个 MessageHandler

UnicastingDispatcher 的 doDispatch()方法

UnicastingDispatcher 的 doDispatch 方法:


private boolean doDispatch(Message<?> message) {   if (tryOptimizedDispatch(message)) {      return true;   }   boolean success = false;   Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);   if (!handlerIterator.hasNext()) {      throw new MessageDispatchingException(message, "Dispatcher has no subscribers");   }   List<RuntimeException> exceptions = new ArrayList<RuntimeException>();   while (!success && handlerIterator.hasNext()) {      MessageHandler handler = handlerIterator.next();      try {         handler.handleMessage(message);         success = true; // we have a winner.      }      catch (Exception e) {         RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,               () -> "Dispatcher failed to deliver Message", e);         exceptions.add(runtimeException);         this.handleExceptions(exceptions, message, !handlerIterator.hasNext());      }   }   return success;}
复制代码


遍历所有的 MessageHandler,调用 handleMessage()处理消息,那么 MessageHandler 是从哪来的呢?


AbstractMessageChannelBinder 在初始化 Binding 时,会创建并初始化 SendingHandler,调用 subscribe()方法添加到 handlers 列表。


AbstractMessageChannelBinder 的初始化由 AbstractBindingLifecycle 在 Spring 容器加载所有 Bean 并完成初始化之后完成。

RocketMQMessageChannelBinder 集成消息发送

AbstractMessageChannelBinder 类提供创建 MessageHandler 规范,createProducerMessageHandler()方法在初始化 Binder 的时候会加载。


RocketMQMessageChannelBinder 继承 AbstractMessageChannelBinder,完成 RocketMQMessageHandler 的创建和初始化,RocketMQMessageHandler 的消息处理器 MessageHandler 的具体实现,RocketMQMessageHandler 在 RocketMQBinder 中的作用就是转化消息格式并发送消息。


RocketMQMessageChannelBinder 的 createProducerMessageHandler 方法:


这个方法就是创建 MessageHandler 的


@Overrideprotected MessageHandler createProducerMessageHandler(ProducerDestination destination,      ExtendedProducerProperties<RocketMQProducerProperties> producerProperties,      MessageChannel channel, MessageChannel errorChannel) throws Exception {   if (producerProperties.getExtension().getEnabled()) {
// if producerGroup is empty, using destination String extendedProducerGroup = producerProperties.getExtension().getGroup(); String producerGroup = StringUtils.isEmpty(extendedProducerGroup) ? destination.getName() : extendedProducerGroup;
RocketMQBinderConfigurationProperties mergedProperties = RocketMQBinderUtils .mergeProperties(rocketBinderConfigurationProperties, rocketMQProperties);
RocketMQTemplate rocketMQTemplate; if (producerProperties.getExtension().getTransactional()) { Map<String, RocketMQTemplate> rocketMQTemplates = getBeanFactory() .getBeansOfType(RocketMQTemplate.class); if (rocketMQTemplates.size() == 0) { throw new IllegalStateException( "there is no RocketMQTemplate in Spring BeanFactory"); } else if (rocketMQTemplates.size() > 1) { throw new IllegalStateException( "there is more than 1 RocketMQTemplates in Spring BeanFactory"); } rocketMQTemplate = rocketMQTemplates.values().iterator().next(); } else { rocketMQTemplate = new RocketMQTemplate(); rocketMQTemplate.setObjectMapper(this.getApplicationContext() .getBeansOfType(ObjectMapper.class).values().iterator().next()); //初始化DefaultMQProducer DefaultMQProducer producer; String ak = mergedProperties.getAccessKey(); String sk = mergedProperties.getSecretKey(); if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) { RPCHook rpcHook = new AclClientRPCHook( new SessionCredentials(ak, sk)); producer = new DefaultMQProducer(producerGroup, rpcHook, mergedProperties.isEnableMsgTrace(), mergedProperties.getCustomizedTraceTopic()); producer.setVipChannelEnabled(false); producer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, destination.getName() + "|" + UtilAll.getPid())); } else { producer = new DefaultMQProducer(producerGroup); producer.setVipChannelEnabled( producerProperties.getExtension().getVipChannelEnabled()); } producer.setNamesrvAddr(mergedProperties.getNameServer()); producer.setSendMsgTimeout( producerProperties.getExtension().getSendMessageTimeout()); producer.setRetryTimesWhenSendFailed( producerProperties.getExtension().getRetryTimesWhenSendFailed()); producer.setRetryTimesWhenSendAsyncFailed(producerProperties .getExtension().getRetryTimesWhenSendAsyncFailed()); producer.setCompressMsgBodyOverHowmuch(producerProperties.getExtension() .getCompressMessageBodyThreshold()); producer.setRetryAnotherBrokerWhenNotStoreOK( producerProperties.getExtension().isRetryNextServer()); producer.setMaxMessageSize( producerProperties.getExtension().getMaxMessageSize()); rocketMQTemplate.setProducer(producer); if (producerProperties.isPartitioned()) { rocketMQTemplate .setMessageQueueSelector(new PartitionMessageQueueSelector()); } }
RocketMQMessageHandler messageHandler = new RocketMQMessageHandler( rocketMQTemplate, destination.getName(), producerGroup, producerProperties.getExtension().getTransactional(), instrumentationManager, producerProperties, ((AbstractMessageChannel) channel).getChannelInterceptors().stream() .filter(channelInterceptor -> channelInterceptor instanceof MessageConverterConfigurer.PartitioningInterceptor) .map(channelInterceptor -> ((MessageConverterConfigurer.PartitioningInterceptor) channelInterceptor)) .findFirst().orElse(null)); messageHandler.setBeanFactory(this.getApplicationContext().getBeanFactory()); messageHandler.setSync(producerProperties.getExtension().getSync()); messageHandler.setHeaderMapper(createHeaderMapper(producerProperties)); if (errorChannel != null) { messageHandler.setSendFailureChannel(errorChannel); } return messageHandler; } else { throw new RuntimeException("Binding for channel " + destination.getName() + " has been disabled, message can't be delivered"); }}
复制代码


RocketMQMessageHandler 中持有 RocketMQTemplate 对象,RocketMQTemplate 是对 RocketMQ 客户端 API 的封装


DefaultMQProducer 由 RocketMQ 客户端提供的 API,发送消息到 RocketMQ 消息服务器都是由它来完成。


RocketMQMessageHandler 是消息发送的处理逻辑,解析 Message 对象头中的参数,调用 RocketMQTemplate 中不同的发送消息接口。

RocketMQMessageHandler 的 handleMessageInternal()方法

RocketMQMessageHandler 用来处理消息


RocketMQMessageHandler 的 handleMessageInternal 方法:


protected void handleMessageInternal(org.springframework.messaging.Message<?> message)      throws Exception {   try {      // issue 737 fix      Map<String, String> jsonHeaders = headerMapper            .fromHeaders(message.getHeaders());      message = org.springframework.messaging.support.MessageBuilder            .fromMessage(message).copyHeaders(jsonHeaders).build();
final StringBuilder topicWithTags = new StringBuilder(destination); String tags = Optional .ofNullable(message.getHeaders().get(RocketMQHeaders.TAGS)).orElse("") .toString(); if (!StringUtils.isEmpty(tags)) { topicWithTags.append(":").append(tags); }
SendResult sendRes = null; //发送事务消息 if (transactional) { sendRes = rocketMQTemplate.sendMessageInTransaction(groupName, topicWithTags.toString(), message, message.getHeaders() .get(RocketMQBinderConstants.ROCKET_TRANSACTIONAL_ARG)); log.debug("transactional send to topic " + topicWithTags + " " + sendRes); } else { //设置定时消息参数 int delayLevel = 0; try { Object delayLevelObj = message.getHeaders() .getOrDefault(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 0); if (delayLevelObj instanceof Number) { delayLevel = ((Number) delayLevelObj).intValue(); } else if (delayLevelObj instanceof String) { delayLevel = Integer.parseInt((String) delayLevelObj); } } catch (Exception e) { // ignore } boolean needSelectQueue = message.getHeaders() .containsKey(BinderHeaders.PARTITION_HEADER); //同步发送 if (sync) { //顺序消息 if (needSelectQueue) { sendRes = rocketMQTemplate.syncSendOrderly( topicWithTags.toString(), message, "", rocketMQTemplate.getProducer().getSendMsgTimeout()); } //普通消息 else { sendRes = rocketMQTemplate.syncSend(topicWithTags.toString(), message, rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel); } log.debug("sync send to topic " + topicWithTags + " " + sendRes); } //异步消息 else { Message<?> finalMessage = message; SendCallback sendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.debug("async send to topic " + topicWithTags + " " + sendResult); }
@Override public void onException(Throwable e) { log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); if (getSendFailureChannel() != null) { getSendFailureChannel().send( RocketMQMessageHandler.this.errorMessageStrategy .buildErrorMessage(new MessagingException( finalMessage, e), null)); } } }; if (needSelectQueue) { rocketMQTemplate.asyncSendOrderly(topicWithTags.toString(), message, "", sendCallback, rocketMQTemplate.getProducer().getSendMsgTimeout()); } else { rocketMQTemplate.asyncSend(topicWithTags.toString(), message, sendCallback); } } } if (sendRes != null && !sendRes.getSendStatus().equals(SendStatus.SEND_OK)) { if (getSendFailureChannel() != null) { this.getSendFailureChannel().send(message); } else { throw new MessagingException(message, new MQClientException("message hasn't been sent", null)); } } } catch (Exception e) { log.error("RocketMQ Message hasn't been sent. Caused by " + e.getMessage()); if (getSendFailureChannel() != null) { getSendFailureChannel().send(this.errorMessageStrategy .buildErrorMessage(new MessagingException(message, e), null)); } else { throw new MessagingException(message, e); } }
}
复制代码


代码有点长,但整体还是很好理解的


  1. 获取消息的目的地,也就是代码中的 tags 变量

  2. 判断是否为事务消息,如果是的话就发送事务消息

  3. 如果不是事务消息,先设置定时消息的参数,判断是否为同步同步消息,如果是的话再判断是顺序消息还是普通消息,顺序消息,同样异步消息还是分为异步顺序消息和异步的普通消息

  4. 根据发送结果,如果发送消息失败的话就把消息发送到失败队列中。


发送普通消息、事务消息、定时消息还是顺序消息,由 Message 对象的消息头 Header 中的属性决定,在业务代码创建 Message 对象时设置。

总结

这篇文章我们讲了 Spring Cloud Stream 消息发送的基本流程,先是业务发送消息,经过 AbstractSubscribableChannel 类的 doSend()方法,方法中调用 UnicastingDispatcher 的 doDispatch()方法进行分发遍历所有的 MessageHandler 进行处理消息,RocketMQMessageHandler 是其中之一,它根据消息头的 header 信息判断是什么类型的消息,然后发送对应的消息,发送失败的消息进行失败的队列中。

发布于: 刚刚阅读数: 2
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
Spring Cloud Stream 消息发送_8月月更_周杰伦本人_InfoQ写作社区