写点什么

RocketMQ Binder 集成消息订阅

作者:周杰伦本人
  • 2022 年 8 月 09 日
    贵州
  • 本文字数:3361 字

    阅读完需:约 11 分钟

RocketMQ Binder 集成消息订阅

AbstractMessageChannelBinder 类中提供了创建 MessageProducer 的协议,在初始化 Binder 的时候加载 createConsumerEndpoint 方法


RocketMQMessageChannelBinder 完成 RocketMQInboundChannelAdapter 的创建和初始化


RocketMQMessageChannelBinder 的 createConsumerEndpoint 方法:


RocketMQInboundChannelAdapter 是适配器,需要适配 Spring Framework 的重试和回调机制,用来订阅消息和转化消息格式。RocketMQListenerBindingContainer 是对 RocketMQ 客户端 API 的封装,适配器中持有它的对象。


RocketMQ 提供两种消费模式:顺序消费和并发消费。RocketMQ 客户端 API 中顺序消费的默认监听器是 DefaultMessageListenerOrderly,并发消费的默认监听器是 DefaultMessageListenerConcurrently 类,无论哪个消费模式,监听器收到的消息都会回调 RocketMQListener


RocketMQInboundChannelAdapter 中创建和初始化 RocketMQListener 的实现类


RocketMQInboundChannelAdapter


DefaultMessageListenerOrderly 收到 RocketMQ 消息后,先回调 BindingRocketMQListener 的 onMessage 方法,再调用 RocketMQInboundChannelAdapter 父类的 sendMessage 方法将消息发送到 DirectChannel


Spring Cloud Stream 的接收消息和发送消息的消息模型是一致的,Binder 中接收的消息先发送到 MessageChannel,由订阅的 MessageChannel 通过 Dispatcher 转发到对应的 MessageHandler 进行处理。


RocketMQInboundChannelAdapter 的父类 MessageProducerSupport 的 getOutputChannel()得到的 MessageChannel 是在初始化 RocketMQ Binder 时传入的 DirectChannel


MessageProducerSupport 的 getOutputChannel 方法:


MessagingTemplate 继承 GenericMessagingTemplate 类,实际执行 doSend()方法发送消息


MessageChannel 的实例是 DirectChannel 对象,复用前面消息发送流程,通过消息分发类 MessageDispatcher 把消息分发给 MessageHandler


DirectChannel 对应的消息处理器是 StreamListenerMessageHandler


InvocableHandlerMethod 使用 java 反射机制完成回调,StreamListenerMessageHandler 与 @


StreamListenerAnnotationBeanPostProcessor 的 afterSingletonsInstantiated 方法:


在 Spring 容器管理的所有单例对象初始化完成之后,遍历 StreamListenerHandlerMethodMapping,进行 InvocableHandlerMethod 和 StreamListenerMessageHandler 的创建和初始化


StreamListenerHandlerMethodMapping 保存了 StreamListener 和 InvocableHandlerMethod 的映射关系,映射关系的创建是在 StreamListenerAnnotationBeanPostProcessor 的 postProcessAfterInitialization()方法


@Overridepublic final Object postProcessAfterInitialization(Object bean, final String beanName)      throws BeansException {   Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean)         : bean.getClass();   Method[] uniqueDeclaredMethods = ReflectionUtils         .getUniqueDeclaredMethods(targetClass);   for (Method method : uniqueDeclaredMethods) {      StreamListener streamListener = AnnotatedElementUtils            .findMergedAnnotation(method, StreamListener.class);      if (streamListener != null && !method.isBridge()) {         this.streamListenerCallbacks.add(() -> {            Assert.isTrue(method.getAnnotation(Input.class) == null,                  StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);            this.doPostProcess(streamListener, method, bean);         });      }   }   return bean;}private void doPostProcess(StreamListener streamListener, Method method,      Object bean) {    streamListener = postProcessAnnotation(streamListener, method);    Optional<StreamListenerSetupMethodOrchestrator> orchestratorOptional;    orchestratorOptional = this.streamListenerSetupMethodOrchestrators.stream()        .filter(t -> t.supports(method)).findFirst();    Assert.isTrue(orchestratorOptional.isPresent(),        "A matching StreamListenerSetupMethodOrchestrator must be present");    StreamListenerSetupMethodOrchestrator streamListenerSetupMethodOrchestrator = orchestratorOptional        .get();    streamListenerSetupMethodOrchestrator        .orchestrateStreamListenerSetupMethod(streamListener, method, bean);  }
@Override public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean) { String methodAnnotatedInboundName = streamListener.value();
String methodAnnotatedOutboundName = StreamListenerMethodUtils .getOutboundBindingTargetName(method); int inputAnnotationCount = StreamListenerMethodUtils .inputAnnotationCount(method); int outputAnnotationCount = StreamListenerMethodUtils .outputAnnotationCount(method); boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, methodAnnotatedOutboundName); StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount, outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName, isDeclarative, streamListener.condition()); if (isDeclarative) { StreamListenerParameterAdapter[] toSlpaArray; toSlpaArray = new StreamListenerParameterAdapter[this.streamListenerParameterAdapters .size()]; Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments( method, methodAnnotatedInboundName, this.applicationContext, this.streamListenerParameterAdapters.toArray(toSlpaArray)); invokeStreamListenerResultAdapter(method, bean, methodAnnotatedOutboundName, adaptedInboundArguments); } else { registerHandlerMethodOnListenedChannel(method, streamListener, bean); } }
private void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) { Assert.hasText(streamListener.value(), "The binding name cannot be null"); if (!StringUtils.hasText(streamListener.value())) { throw new BeanInitializationException( "A bound component name must be specified"); } final String defaultOutputChannel = StreamListenerMethodUtils .getOutboundBindingTargetName(method); if (Void.TYPE.equals(method.getReturnType())) { Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel), "An output channel cannot be specified for a method that does not return a value"); } else { Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel), "An output channel must be specified for a method that can return a value"); } StreamListenerMethodUtils.validateStreamListenerMessageHandler(method); StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add( streamListener.value(), new StreamListenerHandlerMethodMapping(bean, method, streamListener.condition(), defaultOutputChannel, streamListener.copyHeaders())); }
复制代码


StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add 来创建并保存 StreamListenerHandlerMethodMapping


这是使用 Spring Cloud Stream 的消息模型来使用 RocketMQ,也可以使用 SpringBoot 集成的 RocketMQ 组件。

发布于: 刚刚阅读数: 2
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
RocketMQ Binder集成消息订阅_8月月更_周杰伦本人_InfoQ写作社区