disruptor 笔记之二:Disruptor 类分析,java 的学习网站
本篇概览
通过前文的实战,咱们对 Disruptor 有了初步认识,借助 com.lmax.disruptor.dsl.Disruptor 类可以轻松完成以下操作:
环形队列初始化
指定事件消费者
启动消费者线程
接下来要面对两个问题:
深入了解 Disruptor 类是如何完成上述操作的;
对 Disruptor 类有了足够了解时,尝试不用 Disruptor,自己动手操作环形队列,实现消息的生产和消费,这样做的目的是加深对 Disruptor 内部的认识,做到知其所以然;
接下来咱们先解决第一个问题吧,结合 Disruptor 对象的源码来看看上述三个操作到底做了什么;
环形队列初始化
环形队列初始化发生在实例化 Disruptor 对象的时候,即 Disruptor 的构造方法:
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
RingBuffer.createMultiProducer 方法内部实例化了 RingBuffer,如下图红框:
记下第一个重要知识点:创建 RingBuffer 对象;
指定事件消费者
在前文中,下面这行代码指定了事件由 StringEventHandler 消费:
disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));
查看 handleEventsWith 方法的内部:
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
展开 createEventProcessors 方法,如下图,请重点关注创建 SequenceBarrier 和 BatchEventProcessor 等操作:
![在这里插入图片描述](https://img-blog.csdnimg.cn/20210529083503239.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2JvbGluZ19jYXZhbHJ5,size_16,color_FFFFFF,t_70
#pic_center)
展开上图红框四中的 updateGatingSequencesForNextInChain 方法,如下图,红框中的 ringBuffer.addGatingSequences 需要重点关注:
小结一下,disruptor.handleEventsWith 方法涉及到四个重要知识点:
创建 SequenceBarrier 对象,用于接收 ringBuffer 中的可消费事件
创建 BatchEventProcessor,负责消费事件
绑定 BatchEventProcessor 对象的异常处理类
调用 ringBuffer.addGatingSequences,将消费者的 Sequence 传给 ringBuffer
启动消费者线程
前文已通过日志确定了消费事件的逻辑是在一个独立的线程中执行的,启动消费者线程的代码如下:
disruptor.start();
展开 start 方法,如下可见,关键代码是 consumerInfo.start(executor):
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return ringBuffer;
}
ConsumerInfo 是接口,对应的实现类有 EventProcessorInfo 和 WorkerPoolInfo 两种,这里应该是哪种呢?既然来源是 consumerRepository,这就要看当初是怎么存入 consumerRepository 的,前面在分析 createEventProcessors 方法时,下图红框中的 consumerRepository.add 被忽略了,现在需要进去看看:
进去后一目了然,可见 ConsumerInfo 的实现是 EventProcessorInfo:
所以,回到前面对 consumerInfo.start(executor)方法的分析,这里要看的就是 EventProcessorInfo 的 start 方法了,如下图,非常简单,就是启动一个线程执行 eventprocessor(这个 eventprocessor 是 BatchEventProcessor 对象):
小结一下,disruptor.start 方法涉及到一个重要知识点:
启动独立线程,用来执行消费事件的业务逻辑;
消费事件的逻辑
为了理解消息处理逻辑,还要重点关注 BatchEventProcessor.processEvents 方法,如下图所示,其实也很简单,就是不停的从环形队列取出可用的事件,然后再更新自己的 Sequence,相当于标记已经消费到哪里了:
评论