RocketMQ Binder 集成消息订阅
AbstractMessageChannelBinder 类中提供了创建 MessageProducer 的协议,在初始化 Binder 的时候加载 createConsumerEndpoint 方法
RocketMQMessageChannelBinder 完成 RocketMQInboundChannelAdapter 的创建和初始化
RocketMQMessageChannelBinder 的 createConsumerEndpoint 方法:
RocketMQInboundChannelAdapter 是适配器,需要适配 Spring Framework 的重试和回调机制,用来订阅消息和转化消息格式。RocketMQListenerBindingContainer 是对 RocketMQ 客户端 API 的封装,适配器中持有它的对象。
RocketMQ 提供两种消费模式:顺序消费和并发消费。RocketMQ 客户端 API 中顺序消费的默认监听器是 DefaultMessageListenerOrderly,并发消费的默认监听器是 DefaultMessageListenerConcurrently 类,无论哪个消费模式,监听器收到的消息都会回调 RocketMQListener
RocketMQInboundChannelAdapter 中创建和初始化 RocketMQListener 的实现类
RocketMQInboundChannelAdapter
DefaultMessageListenerOrderly 收到 RocketMQ 消息后,先回调 BindingRocketMQListener 的 onMessage 方法,再调用 RocketMQInboundChannelAdapter 父类的 sendMessage 方法将消息发送到 DirectChannel
Spring Cloud Stream 的接收消息和发送消息的消息模型是一致的,Binder 中接收的消息先发送到 MessageChannel,由订阅的 MessageChannel 通过 Dispatcher 转发到对应的 MessageHandler 进行处理。
RocketMQInboundChannelAdapter 的父类 MessageProducerSupport 的 getOutputChannel()得到的 MessageChannel 是在初始化 RocketMQ Binder 时传入的 DirectChannel
MessageProducerSupport 的 getOutputChannel 方法:
MessagingTemplate 继承 GenericMessagingTemplate 类,实际执行 doSend()方法发送消息
MessageChannel 的实例是 DirectChannel 对象,复用前面消息发送流程,通过消息分发类 MessageDispatcher 把消息分发给 MessageHandler
DirectChannel 对应的消息处理器是 StreamListenerMessageHandler
InvocableHandlerMethod 使用 java 反射机制完成回调,StreamListenerMessageHandler 与 @
StreamListenerAnnotationBeanPostProcessor 的 afterSingletonsInstantiated 方法:
在 Spring 容器管理的所有单例对象初始化完成之后,遍历 StreamListenerHandlerMethodMapping,进行 InvocableHandlerMethod 和 StreamListenerMessageHandler 的创建和初始化
StreamListenerHandlerMethodMapping 保存了 StreamListener 和 InvocableHandlerMethod 的映射关系,映射关系的创建是在 StreamListenerAnnotationBeanPostProcessor 的 postProcessAfterInitialization()方法
@Override
public final Object postProcessAfterInitialization(Object bean, final String beanName)
throws BeansException {
Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean)
: bean.getClass();
Method[] uniqueDeclaredMethods = ReflectionUtils
.getUniqueDeclaredMethods(targetClass);
for (Method method : uniqueDeclaredMethods) {
StreamListener streamListener = AnnotatedElementUtils
.findMergedAnnotation(method, StreamListener.class);
if (streamListener != null && !method.isBridge()) {
this.streamListenerCallbacks.add(() -> {
Assert.isTrue(method.getAnnotation(Input.class) == null,
StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
this.doPostProcess(streamListener, method, bean);
});
}
}
return bean;
}
private void doPostProcess(StreamListener streamListener, Method method,
Object bean) {
streamListener = postProcessAnnotation(streamListener, method);
Optional<StreamListenerSetupMethodOrchestrator> orchestratorOptional;
orchestratorOptional = this.streamListenerSetupMethodOrchestrators.stream()
.filter(t -> t.supports(method)).findFirst();
Assert.isTrue(orchestratorOptional.isPresent(),
"A matching StreamListenerSetupMethodOrchestrator must be present");
StreamListenerSetupMethodOrchestrator streamListenerSetupMethodOrchestrator = orchestratorOptional
.get();
streamListenerSetupMethodOrchestrator
.orchestrateStreamListenerSetupMethod(streamListener, method, bean);
}
@Override
public void orchestrateStreamListenerSetupMethod(StreamListener streamListener,
Method method, Object bean) {
String methodAnnotatedInboundName = streamListener.value();
String methodAnnotatedOutboundName = StreamListenerMethodUtils
.getOutboundBindingTargetName(method);
int inputAnnotationCount = StreamListenerMethodUtils
.inputAnnotationCount(method);
int outputAnnotationCount = StreamListenerMethodUtils
.outputAnnotationCount(method);
boolean isDeclarative = checkDeclarativeMethod(method,
methodAnnotatedInboundName, methodAnnotatedOutboundName);
StreamListenerMethodUtils.validateStreamListenerMethod(method,
inputAnnotationCount, outputAnnotationCount,
methodAnnotatedInboundName, methodAnnotatedOutboundName,
isDeclarative, streamListener.condition());
if (isDeclarative) {
StreamListenerParameterAdapter[] toSlpaArray;
toSlpaArray = new StreamListenerParameterAdapter[this.streamListenerParameterAdapters
.size()];
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(
method, methodAnnotatedInboundName, this.applicationContext,
this.streamListenerParameterAdapters.toArray(toSlpaArray));
invokeStreamListenerResultAdapter(method, bean,
methodAnnotatedOutboundName, adaptedInboundArguments);
}
else {
registerHandlerMethodOnListenedChannel(method, streamListener, bean);
}
}
private void registerHandlerMethodOnListenedChannel(Method method,
StreamListener streamListener, Object bean) {
Assert.hasText(streamListener.value(), "The binding name cannot be null");
if (!StringUtils.hasText(streamListener.value())) {
throw new BeanInitializationException(
"A bound component name must be specified");
}
final String defaultOutputChannel = StreamListenerMethodUtils
.getOutboundBindingTargetName(method);
if (Void.TYPE.equals(method.getReturnType())) {
Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel),
"An output channel cannot be specified for a method that does not return a value");
}
else {
Assert.isTrue(!StringUtils.isEmpty(defaultOutputChannel),
"An output channel must be specified for a method that can return a value");
}
StreamListenerMethodUtils.validateStreamListenerMessageHandler(method);
StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add(
streamListener.value(),
new StreamListenerHandlerMethodMapping(bean, method,
streamListener.condition(), defaultOutputChannel,
streamListener.copyHeaders()));
}
复制代码
StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add 来创建并保存 StreamListenerHandlerMethodMapping
这是使用 Spring Cloud Stream 的消息模型来使用 RocketMQ,也可以使用 SpringBoot 集成的 RocketMQ 组件。
评论