阿里 p7 大佬源码简析 Spring-Integration 执行过程,Github 浏览破万
今日分享开始啦,请大家多多指教~
一,前言
Spring-Integration 基于 Spring,在应用程序中启用了轻量级消息传递,并支持通过声明式适配器与外部系统集成。这一段的介绍,概况了整个 Integration 的用途。个人感觉消息传递是真正的重点。
如上图所示,典型的生产者-消费者模式,中间通过一个特定的通道进行数据传输,说到这,是不是隐隐感觉到 queue 的存在。确实事实上这个所谓的通道默认就是用的 blockingqueue。
今天主要是看个简单的 hello word 进来分析下整个执行过程。
先看下代码:
二,ServiceActivator
上面的代码演示了调用方法的入站通道适配器和标准的出站通道适配器, 它们之间是一个带注解的 ServiceActivator。关于这个 ServiceActivator 就是一个消息端点。
消息端点的主要作用是以非侵入性方式将应用程序代码连接到消息传递框架。换句话说,理想情况下,应用程序代码应该不知道消息对象或消息管道。这类似于 MVC 范式中 controller 的作用。正如 controller 处理 HTTP 请求一样,消息端点处理消息。以及 controller 映射到 URL 模式一样,消息端点映射到消息通道。这两种情况的目标是相同的。
ServiceActivator 是用于将服务实例连接到消息传递系统的通用端点。必须配置输入消息通道,如果要调用的服务方法能够返回值,还可以提供输出消息通道。
具体流程如下图:
上面的代码比较简单,但是或许会发现我们只定义了输出通道 oc,输入通道 ic 竟然没有定义也能正常应用,是不是很奇怪?带着疑问我们先看下 ServiceActivator 的源码:
注释上写得很清楚,如果输入通道不存在,将在应用程序上下文中注册具有此名称的 DirectChannel 。具体在哪定义,我们后面会看到,现在不急,先一步步来看他的执行过程。
我们全局查找 ServiceActivator,看他是哪边进行处理的,最后发现了 MessagingAnnotationPostProcessor 类,用来处理方法级消息注解的 BeanPostProcessor 实现。
在 afterPropertiesSet 方法中,我们看到定义了一个后处理器 postProcessors,里面注册了相关的注解处理类。包含各种消息端点处理,除了上面写的 ServiceActivator,还有过滤器,路由,转换器等各种不同的端点方法。
接着往向下看,既然实现了 BeanPostProcessor,那必然要用到 postProcessAfterInitialization 方法实现,这里的流程大概就是遍历出包含有 @ServiceActivator 的 bean 方法,用来做后续处理。我们直接看重点的代码。
Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);
三,postProcess
在 AbstractMethodAnnotationPostProcessor 中有个共通方法 postProcess 用来生成对应的端点信息。具体代码:
这里面主要是两件事:
根据模板模式中不同的 createHandler 抽象方法实现,生成对应的 MessageHandler。譬如说我们这边的 ServiceActivatorAnnotationPostProcessor
将 MessageHandler 实现连接到消息端点,生成对应的 endpoint。
1.createHandler
createHandler 的代码比较简单,就是根据注解中的几个属性还有对应的方法参数,生成 ServiceActivatingHandler。追溯下去 ServiceActivatingHandler 中最后会生成一个委托对象 MessagingMethodInvokerHelper 用来以反射的方式来执行目标方法。
2.createEndpoint
createEndpoint 字面上都能知道是生成消息端点,事实上也是,把生成的 handler 和对应的管道进行关联。具体看下代码体会:
上面的代码中,我们就能清楚地看到为什么我们在 demo 中没有注册输入通道也能正常应用的原因了,从而回答之前的疑问。
通道类型一共有两种,一种是发布订阅,一种是可轮询的,我们是默认是走的第一种,因为 DirectChannel 默认就是个 SubscribableChannel。所以最终我们生成了对应的信息端点类 EventDrivenConsumer。
我们先看下 EventDrivenConsumer 整体结构:
EventDrivenConsumer 上面有一个抽象类 AbstractEndpoint,最上面实现了 Lifecycle 接口,所以生命周期跟着容器走,我们直接跳到 star 方法看:
上面的代码主要就是把 handler 注册到 inputChannel 中,这样只要 inputChannel 通道一收到信息,就会通知他注册的 handlers 进行处理。代码中比较清楚地记录了一切的操作,就不多解释了。
四,发送信息
执行完上面一系列的注册,已经把这一些的通道打通了,剩下的就是真正的发送操作了。下面分析下 inputChannel.send(new GenericMessage<String>("World"));看看 send 操作:
在此频道上发送消息。 如果通道已满,则此方法将阻塞,直到发生超时或发送线程中断。 如果指定的超时时间为 0,则该方法将立即返回。 如果小于零,它将无限期阻塞(请参阅 send(Message) )。
参数:
messageArg – 要发送的消息
timeout - 以毫秒为单位的超时时间
返回:
true 如果消息发送成功, false 如果消息无法在规定时间内发送或发送线程被中断
真正的 send 操作跟下去,会发现层次极深,碍于篇幅,我们直接跟到重点代码:
handleRequestMessage 的操作就是用之前我们 handler 中的委托类 MessagingMethodInvokerHelper 去反射运行对应的端点方法,然后把执行结果发送 outputChannel。最后我们直接定位到具体的发送操作:
看到这,我们就明白了数据的去向,存储在队列里了,生产者产生的数据就已经生成了,所以发送的操作基本上就告一段落了。
五,接收信息
数据已经生成,后面就是看如何消费操作了,下面分析下 outputChannel.receive(0).getPayload()操作:
从该通道接收第一条可用消息。 如果通道不包含任何消息,则此方法将阻塞,直到分配的超时时间过去。 如果指定的超时时间为 0,则该方法将立即返回。 如果小于零,它将无限期阻塞(参见 receive() )。
参数:
timeout - 以毫秒为单位的超时时间
返回:
如果在分配的时间内没有可用的消息或接收线程被中断,则为第一个可用消息或 null 。
最后的 doReceive 操作,其实大家都心知肚明了,就是从上面的队列中直接读取数据,代码比较简单,就不注释了:
六,结语
这一系列的执行过程其实还是比较绕的,Spring-Integration 其实涉及到的应用分支更多。如果让你对 Spring-Integration 产生了兴趣,那本文的目的就达到了。这需要你自己去实地操作研究下,总是有收获的。
今日份分享已结束,请大家多多包涵和指点!
评论