写点什么

Disruptor—核心源码实现分析(二)

  • 2025-05-27
    福建
  • 本文字数:18943 字

    阅读完需:约 62 分钟

2.Disruptor 的消费者源码分析


Disruptor 的消费者主要由 BatchEventProcessor 类和 WorkProcessor 类来实现,并通过 Disruptor 的 handleEventsWith()方法或者 handleEventsWithWorkerPool()方法和 start()方法来启动。

 

执行 Disruptor 的 handleEventsWith()方法绑定消费者时,会创建 BatchEventProcessor 对象,并将其添加到 Disruptor 的 consumerRepository 属性。

 

执行 Disruptor 的 handleEventsWithWorkerPool()方法绑定消费者时,则会创建 WorkProcessor 对象,并将该对象添加到 Disruptor 的 consumerRepository 属性。

 

执行 Disruptor 的 start()方法启动 Disruptor 实例时,便会通过线程池执行 BatchEventProcessor 里的 run()方法,或者通过线程池执行 WorkProcessor 里的 run()方法。

 

执行 BatchEventProcessor 的 run()方法时,会通过修改 BatchEventProcessor 的 sequence 来实现消费 RingBuffer 的数据。

 

执行 WorkProcessor 的 run()方法时,会通过修改 WorkProcessor 的 sequence 来实现消费 RingBuffer 的数据。


public class Main {    public static void main(String[] args) {        //参数准备        OrderEventFactory orderEventFactory = new OrderEventFactory();        int ringBufferSize = 4;        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());          //参数一:eventFactory,消息(Event)工厂对象        //参数二:ringBufferSize,容器的长度        //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler        //参数四:ProducerType,单生产者还是多生产者        //参数五:waitStrategy,等待策略        //1.实例化Disruptor对象        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(            orderEventFactory,            ringBufferSize,            executor,            ProducerType.SINGLE,            new BlockingWaitStrategy()        );          //2.添加Event处理器,用于处理事件        //也就是构建Disruptor与消费者的一个关联关系        //方式一:使用handleEventsWith()方法        disruptor.handleEventsWith(new OrderEventHandler());        //方式二:使用handleEventsWithWorkerPool()方法        //disruptor.handleEventsWithWorkerPool(workHandlers);          //3.启动disruptor        disruptor.start();          //4.获取实际存储数据的容器: RingBuffer        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();        OrderEventProducer producer = new OrderEventProducer(ringBuffer);        ByteBuffer bb = ByteBuffer.allocate(8);        for (long i = 0; i < 5; i++) {            bb.putLong(0, i);            //向容器中投递数据            producer.sendData(bb);        }        disruptor.shutdown();        executor.shutdown();    }}
复制代码


public class Disruptor<T> {    private final RingBuffer<T> ringBuffer;    private final Executor executor;    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();    private final AtomicBoolean started = new AtomicBoolean(false);    private ExceptionHandler<? super T> exceptionHandler;    ...        //绑定消费者,设置EventHandler,创建EventProcessor    //Set up event handlers to handle events from the ring buffer.     //These handlers will process events as soon as they become available, in parallel.    //This method can be used as the start of a chain.     //For example if the handler A must process events before handler B: dw.handleEventsWith(A).then(B);     //@param handlers the event handlers that will process events.    //@return a EventHandlerGroup that can be used to chain dependencies.    @SuppressWarnings("varargs")    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {        return createEventProcessors(new Sequence[0], handlers);    }        //创建BatchEventProcessor,添加到consumerRepository中    EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {        checkNotStarted();        final Sequence[] processorSequences = new Sequence[eventHandlers.length];        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {            final EventHandler<? super T> eventHandler = eventHandlers[i];            //创建BatchEventProcessor对象            final BatchEventProcessor<T> batchEventProcessor =                 new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);            if (exceptionHandler != null) {                batchEventProcessor.setExceptionHandler(exceptionHandler);            }            //添加BatchEventProcessor对象到consumerRepository中            consumerRepository.add(batchEventProcessor, eventHandler, barrier);            //一个消费者线程对应一个batchEventProcessor            //每个batchEventProcessor都会持有一个Sequence对象来表示当前消费者线程的消费进度            processorSequences[i] = batchEventProcessor.getSequence();        }        //将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性)        updateGatingSequencesForNextInChain(barrierSequences, processorSequences);        return new EventHandlerGroup<>(this, consumerRepository, processorSequences);    }
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) { if (processorSequences.length > 0) { ringBuffer.addGatingSequences(processorSequences); for (final Sequence barrierSequence : barrierSequences) { ringBuffer.removeGatingSequence(barrierSequence); } consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } } private void checkNotStarted() { //线程的开关会使用CAS实现 if (started.get()) { throw new IllegalStateException("All event handlers must be added before calling starts."); } } ... //Starts the event processors and returns the fully configured ring buffer. //The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor. //This method must only be called once after all event processors have been added. //@return the configured ring buffer. public RingBuffer<T> start() { checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository) { //在执行Disruptor.handleEventsWith()方法,调用Disruptor.createEventProcessors()方法时, //会将新创建的BatchEventProcessor对象封装成EventProcessorInfo对象(即ConsumerInfo对象), //然后通过add()方法添加到consumerRepository中 //所以下面会调用EventProcessorInfo.start()方法 consumerInfo.start(executor); } return ringBuffer; } private void checkOnlyStartedOnce() { //线程的开关使用CAS实现 if (!started.compareAndSet(false, true)) { throw new IllegalStateException("Disruptor.start() must only be called once."); } } ...}
//Provides a repository mechanism to associate EventHandlers with EventProcessorsclass ConsumerRepository<T> implements Iterable<ConsumerInfo> { private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>(); private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>(); //添加BatchEventProcessor对象到consumerRepository中 public void add(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) { //将传入的BatchEventProcessor对象封装成EventProcessorInfo对象,即ConsumerInfo对象 final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier); eventProcessorInfoByEventHandler.put(handler, consumerInfo); eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo); consumerInfos.add(consumerInfo); } ...}
class EventProcessorInfo<T> implements ConsumerInfo { private final EventProcessor eventprocessor; private final EventHandler<? super T> handler; private final SequenceBarrier barrier; private boolean endOfChain = true;
EventProcessorInfo(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier) { this.eventprocessor = eventprocessor; this.handler = handler; this.barrier = barrier; } ... @Override public void start(final Executor executor) { //通过传入的线程池,执行BatchEventProcessor对象的run()方法 //传入的线程池,其实就是初始化Disruptor时指定的线程池 executor.execute(eventprocessor); } ...}
//Convenience class for handling the batching semantics of consuming entries from //a RingBuffer and delegating the available events to an EventHandler.//If the EventHandler also implements LifecycleAware it will be notified just after //the thread is started and just before the thread is shutdown.//@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.public final class BatchEventProcessor<T> implements EventProcessor { private final AtomicBoolean running = new AtomicBoolean(false); private ExceptionHandler<? super T> exceptionHandler = new FatalExceptionHandler(); private final DataProvider<T> dataProvider; private final SequenceBarrier sequenceBarrier; private final EventHandler<? super T> eventHandler; private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final TimeoutHandler timeoutHandler;
//Construct a EventProcessor that will automatically track the progress by //updating its sequence when the EventHandler#onEvent(Object, long, boolean) method returns. //@param dataProvider to which events are published. //@param sequenceBarrier on which it is waiting. //@param eventHandler is the delegate to which events are dispatched. public BatchEventProcessor(final DataProvider<T> dataProvider, final SequenceBarrier sequenceBarrier, final EventHandler<? super T> eventHandler) { //传入的dataProvider其实就是Disruptor的ringBuffer this.dataProvider = dataProvider; this.sequenceBarrier = sequenceBarrier; this.eventHandler = eventHandler; if (eventHandler instanceof SequenceReportingEventHandler) { ((SequenceReportingEventHandler<?>)eventHandler).setSequenceCallback(sequence); } timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null; } ... //It is ok to have another thread rerun this method after a halt(). //通过对sequence进行修改来实现消费RingBuffer里的数据 @Override public void run() { if (running.compareAndSet(IDLE, RUNNING)) { sequenceBarrier.clearAlert(); notifyStart(); try { if (running.get() == RUNNING) { processEvents(); } } finally { notifyShutdown(); running.set(IDLE); } } else { //This is a little bit of guess work. //The running state could of changed to HALTED by this point. //However, Java does not have compareAndExchange which is the only way to get it exactly correct. if (running.get() == RUNNING) { throw new IllegalStateException("Thread is already running"); } else { earlyExit(); } } } private void processEvents() { T event = null; long nextSequence = sequence.get() + 1L;
while (true) { try { //通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息 final long availableSequence = sequenceBarrier.waitFor(nextSequence); if (batchStartAware != null) { batchStartAware.onBatchStart(availableSequence - nextSequence + 1); } while (nextSequence <= availableSequence) { //从RingBuffer中获取要消费的数据 event = dataProvider.get(nextSequence); //执行消费者实现的onEvent()方法来消费数据 eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } //设置消费者当前的消费进度 sequence.set(availableSequence); } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (running.get() != RUNNING) { break; } } catch (final Throwable ex) { handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } }
private void earlyExit() { notifyStart(); notifyShutdown(); }
private void notifyTimeout(final long availableSequence) { try { if (timeoutHandler != null) { timeoutHandler.onTimeout(availableSequence); } } catch (Throwable e) { handleEventException(e, availableSequence, null); } }
//Notifies the EventHandler when this processor is starting up private void notifyStart() { if (eventHandler instanceof LifecycleAware) { try { ((LifecycleAware) eventHandler).onStart(); } catch (final Throwable ex) { handleOnStartException(ex); } } }
//Notifies the EventHandler immediately prior to this processor shutting down private void notifyShutdown() { if (eventHandler instanceof LifecycleAware) { try { ((LifecycleAware) eventHandler).onShutdown(); } catch (final Throwable ex) { handleOnShutdownException(ex); } } } ...}
复制代码


public class Disruptor<T> {    private final RingBuffer<T> ringBuffer;    private final Executor executor;    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();    private final AtomicBoolean started = new AtomicBoolean(false);    private ExceptionHandler<? super T> exceptionHandler;    ...        //设置WorkHandler,创建WorkProcessor    //Set up a WorkerPool to distribute an event to one of a pool of work handler threads.    //Each event will only be processed by one of the work handlers.    //The Disruptor will automatically start this processors when #start() is called.    //@param workHandlers the work handlers that will process events.    //@return a {@link EventHandlerGroup} that can be used to chain dependencies.    @SafeVarargs    @SuppressWarnings("varargs")    public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers) {        return createWorkerPool(new Sequence[0], workHandlers);    }        //创建WorkerPool,添加到consumerRepository中    EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers) {        final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);        //创建WorkerPool对象,以及根据workHandlers创建WorkProcessor        final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);        //添加WorkerPool对象到consumerRepository中          consumerRepository.add(workerPool, sequenceBarrier);        final Sequence[] workerSequences = workerPool.getWorkerSequences();        //将每个消费者线程持有的Sequence对象添加到生产者Sequencer的gatingSequences属性中(Sequence[]属性)        updateGatingSequencesForNextInChain(barrierSequences, workerSequences);        return new EventHandlerGroup<>(this, consumerRepository, workerSequences);    }        private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {        if (processorSequences.length > 0) {            ringBuffer.addGatingSequences(processorSequences);            for (final Sequence barrierSequence : barrierSequences) {                ringBuffer.removeGatingSequence(barrierSequence);            }            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);        }    }    ...        //Starts the event processors and returns the fully configured ring buffer.    //The ring buffer is set up to prevent overwriting any entry that is yet to be processed by the slowest event processor.    //This method must only be called once after all event processors have been added.    //@return the configured ring buffer.    public RingBuffer<T> start() {        checkOnlyStartedOnce();        for (final ConsumerInfo consumerInfo : consumerRepository) {            //在执行Disruptor.handleEventsWithWorkerPool()方法,调用Disruptor.createWorkerPool()方法时,            //会将新创建的WorkerPool对象封装成WorkerPoolInfo对象(即ConsumerInfo对象),            //然后通过add()方法添加到consumerRepository中            //所以下面会调用WorkerPoolInfo.start()方法            consumerInfo.start(executor);        }        return ringBuffer;    }        private void checkOnlyStartedOnce() {        //线程的开关使用CAS实现        if (!started.compareAndSet(false, true)) {            throw new IllegalStateException("Disruptor.start() must only be called once.");        }    }    ...}
//Provides a repository mechanism to associate EventHandlers with EventProcessorsclass ConsumerRepository<T> implements Iterable<ConsumerInfo> { private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>(); private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>(); //添加WorkerPool对象到consumerRepository中 public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) { final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<>(workerPool, sequenceBarrier); consumerInfos.add(workerPoolInfo); for (Sequence sequence : workerPool.getWorkerSequences()) { eventProcessorInfoBySequence.put(sequence, workerPoolInfo); } } ...}
class WorkerPoolInfo<T> implements ConsumerInfo { private final WorkerPool<T> workerPool; private final SequenceBarrier sequenceBarrier; private boolean endOfChain = true; WorkerPoolInfo(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier) { this.workerPool = workerPool; this.sequenceBarrier = sequenceBarrier; } @Override public void start(Executor executor) { workerPool.start(executor); } ...}
public final class WorkerPool<T> { private final AtomicBoolean started = new AtomicBoolean(false); private final Sequence workSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RingBuffer<T> ringBuffer; //WorkProcessors are created to wrap each of the provided WorkHandlers private final WorkProcessor<?>[] workProcessors; //Create a worker pool to enable an array of WorkHandlers to consume published sequences. //This option requires a pre-configured RingBuffer which must have RingBuffer#addGatingSequences(Sequence...) called before the work pool is started. //@param ringBuffer of events to be consumed. //@param sequenceBarrier on which the workers will depend. //@param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s. //@param workHandlers to distribute the work load across. @SafeVarargs public WorkerPool(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final ExceptionHandler<? super T> exceptionHandler, final WorkHandler<? super T>... workHandlers) { this.ringBuffer = ringBuffer; final int numWorkers = workHandlers.length; //根据workHandlers创建WorkProcessor workProcessors = new WorkProcessor[numWorkers]; for (int i = 0; i < numWorkers; i++) { workProcessors[i] = new WorkProcessor<>(ringBuffer, sequenceBarrier, workHandlers[i], exceptionHandler, workSequence); } } //Start the worker pool processing events in sequence. //@param executor providing threads for running the workers. //@return the {@link RingBuffer} used for the work queue. //@throws IllegalStateException if the pool has already been started and not halted yet public RingBuffer<T> start(final Executor executor) { if (!started.compareAndSet(false, true)) { throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted."); } final long cursor = ringBuffer.getCursor(); workSequence.set(cursor); for (WorkProcessor<?> processor : workProcessors) { processor.getSequence().set(cursor); //通过传入的线程池,执行WorkProcessor对象的run()方法 executor.execute(processor); } return ringBuffer; } ...}
public final class WorkProcessor<T> implements EventProcessor { private final AtomicBoolean running = new AtomicBoolean(false); private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); private final RingBuffer<T> ringBuffer; private final SequenceBarrier sequenceBarrier; private final WorkHandler<? super T> workHandler; private final ExceptionHandler<? super T> exceptionHandler; private final Sequence workSequence; private final EventReleaser eventReleaser = new EventReleaser() { @Override public void release() { sequence.set(Long.MAX_VALUE); } }; private final TimeoutHandler timeoutHandler;
//Construct a {@link WorkProcessor}. //@param ringBuffer to which events are published. //@param sequenceBarrier on which it is waiting. //@param workHandler is the delegate to which events are dispatched. //@param exceptionHandler to be called back when an error occurs //@param workSequence from which to claim the next event to be worked on. It should always be initialised as Sequencer#INITIAL_CURSOR_VALUE public WorkProcessor(final RingBuffer<T> ringBuffer, final SequenceBarrier sequenceBarrier, final WorkHandler<? super T> workHandler, final ExceptionHandler<? super T> exceptionHandler, final Sequence workSequence) { this.ringBuffer = ringBuffer; this.sequenceBarrier = sequenceBarrier; this.workHandler = workHandler; this.exceptionHandler = exceptionHandler; this.workSequence = workSequence; if (this.workHandler instanceof EventReleaseAware) { ((EventReleaseAware) this.workHandler).setEventReleaser(eventReleaser); } timeoutHandler = (workHandler instanceof TimeoutHandler) ? (TimeoutHandler) workHandler : null; } //通过对sequence进行修改来实现消费RingBuffer里的数据 @Override public void run() { if (!running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); notifyStart(); boolean processedSequence = true; long cachedAvailableSequence = Long.MIN_VALUE; long nextSequence = sequence.get(); T event = null; while (true) { try { if (processedSequence) { processedSequence = false; do { nextSequence = workSequence.get() + 1L; //设置消费者当前的消费进度 sequence.set(nextSequence - 1L); } while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence)); } if (cachedAvailableSequence >= nextSequence) { //从RingBuffer中获取要消费的数据 event = ringBuffer.get(nextSequence); //执行消费者实现的onEvent()方法来消费数据 workHandler.onEvent(event); processedSequence = true; } else { //通过sequenceBarrier.waitFor()方法看看消费者是否需要等待生产者投递消息 cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence); } } catch (final TimeoutException e) { notifyTimeout(sequence.get()); } catch (final AlertException ex) { if (!running.get()) { break; } } catch (final Throwable ex) { //handle, mark as processed, unless the exception handler threw an exception exceptionHandler.handleEventException(ex, nextSequence, event); processedSequence = true; } } notifyShutdown(); running.set(false); } ...}
复制代码


public class Disruptor<T> {    private final RingBuffer<T> ringBuffer;        private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences) {        if (processorSequences.length > 0) {            ringBuffer.addGatingSequences(processorSequences);            for (final Sequence barrierSequence : barrierSequences) {                ringBuffer.removeGatingSequence(barrierSequence);            }            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);        }    }    ...}
abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7;}
abstract class RingBufferFields<E> extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } ...}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { ... //Add the specified gating sequences to this instance of the Disruptor. //They will safely and atomically added to the list of gating sequences. //@param gatingSequences The sequences to add. public void addGatingSequences(Sequence... gatingSequences) { sequencer.addGatingSequences(gatingSequences); } ...}
public interface Sequencer extends Cursored, Sequenced { ... //Add the specified gating sequences to this instance of the Disruptor. //They will safely and atomically added to the list of gating sequences. //@param gatingSequences The sequences to add. void addGatingSequences(Sequence... gatingSequences); ...}
public abstract class AbstractSequencer implements Sequencer { private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); ... @Override public final void addGatingSequences(Sequence... gatingSequences) { SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences); } ...}
class SequenceGroups { static <T> void addSequences(final T holder, final AtomicReferenceFieldUpdater<T, Sequence[]> updater, final Cursored cursor, final Sequence... sequencesToAdd) { long cursorSequence; Sequence[] updatedSequences; Sequence[] currentSequences;
do { currentSequences = updater.get(holder); updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length); cursorSequence = cursor.getCursor();
int index = currentSequences.length; for (Sequence sequence : sequencesToAdd) { sequence.set(cursorSequence); updatedSequences[index++] = sequence; } } while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
cursorSequence = cursor.getCursor(); for (Sequence sequence : sequencesToAdd) { sequence.set(cursorSequence); } } ...}
复制代码


3.Disruptor 的 WaitStrategy 等待策略分析


在生产者发布消息时,会调用 WaitStrategy 的 signalAllWhenBlocking()方法唤醒阻塞的消费者。在消费者消费消息时,会调用 WaitStrategy 的 waitFor()方法阻塞消费过快的消费者。

 

当然,不同的策略不一定就是阻塞消费者,比如 BlockingWaitStrategy 会通过 ReentrantLock 来阻塞消费者,而 YieldingWaitStrategy 则通过 yield 切换线程来实现让消费者无锁等待,即通过 Thread 的 yield()方法切换线程让另一个线程继续执行自旋判断操作。

 

所以 YieldingWaitStrategy 等待策略的效率是最高的 + 最耗费 CPU 资源,当然效率次高、比较耗费 CPU 资源的是 BusySpinWaitStrategy 等待策略。

 

Disruptor 提供了如下几种等待策略:


一.完全阻塞的等待策略BlockingWaitStrategy
二.切换线程自旋的等待策略YieldingWaitStrategy
三.繁忙自旋的等待策略BusySpinWaitStrategy
四.轻微阻塞的等待策略LiteBlockingWaitStrategy也就是唤醒阻塞线程时,通过GAS避免并发获取锁的等待策略
五.最小睡眠 + 切换线程的等待策略SleepingWaitStrategy
复制代码


总结:


为了达到最高效率,有大量CPU资源,可切换线程让多个线程自旋判断为了保证高效的同时兼顾CPU资源,可以让单个线程自旋判断为了保证比较高效更加兼顾CPU资源,可以切换线程自旋 + 最少睡眠为了完全兼顾CPU资源不考虑效率问题,可以采用重入锁实现阻塞唤醒为了完全兼顾CPU资源但考虑一点效率,可以采用重入锁 + GAS唤醒
复制代码


//完全阻塞的等待策略//Blocking strategy that uses a lock and condition variable for EventProcessors waiting on a barrier.//This strategy can be used when throughput and low-latency are not as important as CPU resource.public final class BlockingWaitStrategy implements WaitStrategy {    private final Lock lock = new ReentrantLock();    private final Condition processorNotifyCondition = lock.newCondition();        @Override    public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException {        long availableSequence;        if ((availableSequence = cursorSequence.get()) < sequence) {            lock.lock();            try {                while ((availableSequence = cursorSequence.get()) < sequence) {                    barrier.checkAlert();                    processorNotifyCondition.await();                }            } finally {                lock.unlock();            }        }        while ((availableSequence = dependentSequence.get()) < sequence) {            barrier.checkAlert();        }        return availableSequence;    }
@Override public void signalAllWhenBlocking() { lock.lock(); try { processorNotifyCondition.signalAll(); } finally { lock.unlock(); } }}
//切换线程自旋的等待策略//Yielding strategy that uses a Thread.yield() for EventProcessors waiting on a barrier after an initially spinning.//This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes.public final class YieldingWaitStrategy implements WaitStrategy { private static final int SPIN_TRIES = 100;
@Override public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; int counter = SPIN_TRIES; while ((availableSequence = dependentSequence.get()) < sequence) { counter = applyWaitMethod(barrier, counter); } return availableSequence; }
@Override public void signalAllWhenBlocking() { }
private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { barrier.checkAlert(); if (0 == counter) { //切换线程,让另一个线程继续执行自旋操作 Thread.yield(); } else { --counter; } return counter; }}
//繁忙自旋的等待策略//Busy Spin strategy that uses a busy spin loop for EventProcessors waiting on a barrier.//This strategy will use CPU resource to avoid syscalls which can introduce latency jitter.//It is best used when threads can be bound to specific CPU cores.public final class BusySpinWaitStrategy implements WaitStrategy { @Override public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
@Override public void signalAllWhenBlocking() { }}
//轻微阻塞的等待策略(唤醒阻塞线程时避免了并发获取锁)//Variation of the BlockingWaitStrategy that attempts to elide conditional wake-ups when the lock is uncontended.//Shows performance improvements on microbenchmarks.//However this wait strategy should be considered experimental as I have not full proved the correctness of the lock elision code.public final class LiteBlockingWaitStrategy implements WaitStrategy { private final Lock lock = new ReentrantLock(); private final Condition processorNotifyCondition = lock.newCondition(); private final AtomicBoolean signalNeeded = new AtomicBoolean(false);
@Override public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; if ((availableSequence = cursorSequence.get()) < sequence) { lock.lock(); try { do { signalNeeded.getAndSet(true); if ((availableSequence = cursorSequence.get()) >= sequence) { break; } barrier.checkAlert(); processorNotifyCondition.await(); } while ((availableSequence = cursorSequence.get()) < sequence); } finally { lock.unlock(); } } while ((availableSequence = dependentSequence.get()) < sequence) { barrier.checkAlert(); } return availableSequence; }
@Override public void signalAllWhenBlocking() { if (signalNeeded.getAndSet(false)) { lock.lock(); try { processorNotifyCondition.signalAll(); } finally { lock.unlock(); } } }}
//最小睡眠 + 切换线程的等待策略SleepingWaitStrategy//Sleeping strategy that initially spins, then uses a Thread.yield(), //and eventually sleep LockSupport.parkNanos(1) for the minimum number of nanos the OS //and JVM will allow while the EventProcessors are waiting on a barrier.//This strategy is a good compromise between performance and CPU resource.//Latency spikes can occur after quiet periods.public final class SleepingWaitStrategy implements WaitStrategy { private static final int DEFAULT_RETRIES = 200; private final int retries;
public SleepingWaitStrategy() { this(DEFAULT_RETRIES); } public SleepingWaitStrategy(int retries) { this.retries = retries; }
@Override public long waitFor(final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier) throws AlertException, InterruptedException { long availableSequence; int counter = retries; while ((availableSequence = dependentSequence.get()) < sequence) { counter = applyWaitMethod(barrier, counter); } return availableSequence; }
@Override public void signalAllWhenBlocking() { }
private int applyWaitMethod(final SequenceBarrier barrier, int counter) throws AlertException { barrier.checkAlert(); if (counter > 100) { --counter; } else if (counter > 0) { --counter; Thread.yield(); } else { LockSupport.parkNanos(1L); } return counter; }}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18896162

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Disruptor—核心源码实现分析(二)_Java_不在线第一只蜗牛_InfoQ写作社区