写点什么

Disruptor—核心源码实现分析

  • 2025-05-26
    福建
  • 本文字数:18519 字

    阅读完需:约 61 分钟

1.Disruptor 的生产者源码分析


(1)通过 Sequence 序号发布消息


生产者可以先从 RingBuffer 中获取一个可用的 Sequence 序号,然后再根据该 Sequence 序号从 RingBuffer 的环形数组中获取对应的元素,接着对该元素进行赋值替换,最后调用 RingBuffer 的 publish()方法设置当前生产者的 Sequence 序号来完成事件消息的发布。


//注意:这里使用的版本是3.4.4//单生产者单消费者的使用示例public class Main {    public static void main(String[] args) {        //参数准备        OrderEventFactory orderEventFactory = new OrderEventFactory();        int ringBufferSize = 4;        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());          //参数一:eventFactory,消息(Event)工厂对象        //参数二:ringBufferSize,容器的长度        //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler        //参数四:ProducerType,单生产者还是多生产者        //参数五:waitStrategy,等待策略        //1.实例化Disruptor对象        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(            orderEventFactory,            ringBufferSize,            executor,            ProducerType.SINGLE,            new BlockingWaitStrategy()        );          //2.添加Event处理器,用于处理事件        //也就是构建Disruptor与消费者的一个关联关系        disruptor.handleEventsWith(new OrderEventHandler());          //3.启动Disruptor        disruptor.start();          //4.获取实际存储数据的容器: RingBuffer        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();        OrderEventProducer producer = new OrderEventProducer(ringBuffer);        ByteBuffer bb = ByteBuffer.allocate(8);        for (long i = 0; i < 5; i++) {            bb.putLong(0, i);            //向容器中投递数据            producer.sendData(bb);        }        disruptor.shutdown();        executor.shutdown();    }}
public class OrderEventProducer { private RingBuffer<OrderEvent> ringBuffer; public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void sendData(ByteBuffer data) { //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号 long sequence = ringBuffer.next(); try { //2.根据这个序号, 找到具体的"OrderEvent"元素 //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象" OrderEvent event = ringBuffer.get(sequence); //3.进行实际的赋值处理 event.setValue(data.getLong(0)); } finally { //4.提交发布操作 ringBuffer.publish(sequence); } }}
public class OrderEventHandler implements EventHandler<OrderEvent> { public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception { Thread.sleep(1000); System.err.println("消费者: " + event.getValue()); }}
复制代码


//多生产者多消费者的使用示例public class Main {    public static void main(String[] args) throws InterruptedException {        //1.创建RingBuffer        RingBuffer<Order> ringBuffer = RingBuffer.create(            ProducerType.MULTI,//多生产者            new EventFactory<Order>() {                public Order newInstance() {                    return new Order();                }            },            1024 * 1024,            new YieldingWaitStrategy()        );
//2.通过ringBuffer创建一个屏障 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口 Consumer[] consumers = new Consumer[10]; for (int i = 0; i < consumers.length; i++) { consumers[i] = new Consumer("C" + i); }
//4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool WorkerPool<Order> workerPool = new WorkerPool<Order>( ringBuffer, sequenceBarrier, new EventExceptionHandler(), consumers );
//5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中 ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
//6.启动workerPool workerPool.start(Executors.newFixedThreadPool(5));
final CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < 100; i++) { final Producer producer = new Producer(ringBuffer); new Thread(new Runnable() { public void run() { try { latch.await(); } catch (Exception e) { e.printStackTrace(); } for (int j = 0; j < 100; j++) { producer.sendData(UUID.randomUUID().toString()); } } }).start(); }
Thread.sleep(2000); System.err.println("----------线程创建完毕,开始生产数据----------"); latch.countDown(); Thread.sleep(10000); System.err.println("任务总数:" + consumers[2].getCount()); }}
public class Producer { private RingBuffer<Order> ringBuffer; public Producer(RingBuffer<Order> ringBuffer) { this.ringBuffer = ringBuffer; }
public void sendData(String uuid) { //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号 long sequence = ringBuffer.next(); try { //2.根据这个序号, 找到具体的"Order"元素 //注意:此时获取的Order对象是一个没有被赋值的"空对象" Order order = ringBuffer.get(sequence); //3.进行实际的赋值处理 order.setId(uuid); } finally { //4.提交发布操作 ringBuffer.publish(sequence); } }}
public class Consumer implements WorkHandler<Order> { private static AtomicInteger count = new AtomicInteger(0); private String consumerId; private Random random = new Random();
public Consumer(String consumerId) { this.consumerId = consumerId; }
public void onEvent(Order event) throws Exception { Thread.sleep(1 * random.nextInt(5)); System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId()); count.incrementAndGet(); }
public int getCount() { return count.get(); }}
复制代码


其中,RingBuffer 的 publish(sequence)方法会调用 Sequencer 接口的 publish()方法来设置当前生产者的 Sequence 序号。


abstract class RingBufferPad {    protected long p1, p2, p3, p4, p5, p6, p7;}
abstract class RingBufferFields<E> extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask;
//环形数组存储事件消息 private final Object[] entries; protected final int bufferSize;
//RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } ...}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { protected long p1, p2, p3, p4, p5, p6, p7; ... //Increment and return the next sequence for the ring buffer. //Calls of this method should ensure that they always publish the sequence afterward. //E.g. //long sequence = ringBuffer.next(); //try { // Event e = ringBuffer.get(sequence); // //Do some work with the event. //} finally { // ringBuffer.publish(sequence); //} //@return The next sequence to publish to. //@see RingBuffer#publish(long) //@see RingBuffer#get(long) @Override public long next() { return sequencer.next(); } //Publish the specified sequence. //This action marks this particular message as being available to be read. //@param sequence the sequence to publish. @Override public void publish(long sequence) { sequencer.publish(sequence); } //Get the event for a given sequence in the RingBuffer. //This call has 2 uses. //Firstly use this call when publishing to a ring buffer. //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long). //Secondly use this call when consuming data from the ring buffer. //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method. //@param sequence for the event //@return the event for the given sequence @Override public E get(long sequence) { //调用父类RingBufferFields的elementAt()方法 return elementAt(sequence); } ...}
复制代码


RingBuffer 的 sequencer 属性会在创建 RingBuffer 对象时传入,而创建 RingBuffer 对象的时机则是在初始化 Disruptor 的时候。

 

在 Disruptor 的构造方法中,会调用 RingBuffer 的 create()方法,RingBuffer 的 create()方法会根据不同的生产者类型来初始化 sequencer 属性。

 

由生产者线程通过 new 创建的 Sequencer 接口实现类的实例就是一个生产者。单生产者的线程执行上面的 main()方法时,会创建一个单生产者 Sequencer 实例来代表生产者。多生产者的线程执行如下的 main()方法时,会创建一个多生产者 Sequencer 实例来代表生产者。


public class Disruptor<T> {    private final RingBuffer<T> ringBuffer;    private final Executor executor;    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();    private final AtomicBoolean started = new AtomicBoolean(false);    private ExceptionHandler<? super T> exceptionHandler;    ...        //Create a new Disruptor.    //@param eventFactory   the factory to create events in the ring buffer.    //@param ringBufferSize the size of the ring buffer, must be power of 2.    //@param executor       an Executor to execute event processors.    //@param producerType   the claim strategy to use for the ring buffer.    //@param waitStrategy   the wait strategy to use for the ring buffer.    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) {        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);    }        private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {        this.ringBuffer = ringBuffer;        this.executor = executor;    }    ...}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { protected long p1, p2, p3, p4, p5, p6, p7; ... //Create a new Ring Buffer with the specified producer type (SINGLE or MULTI) //@param producerType producer type to use ProducerType. //@param factory used to create events within the ring buffer. //@param bufferSize number of elements to create within the ring buffer. //@param waitStrategy used to determine how to wait for new elements to become available. public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: //单生产者模式下的当前生产者是一个SingleProducerSequencer实例 return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: //多生产者模式下的当前生产者是一个MultiProducerSequencer实例 return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } } //Create a new single producer RingBuffer with the specified wait strategy. //@param <E> Class of the event stored in the ring buffer. //@param factory used to create the events within the ring buffer. //@param bufferSize number of elements to create within the ring buffer. //@param waitStrategy used to determine how to wait for new elements to become available. //@return a constructed ring buffer. public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } //Create a new multiple producer RingBuffer with the specified wait strategy. //@param <E> Class of the event stored in the ring buffer. //@param factory used to create the events within the ring buffer. //@param bufferSize number of elements to create within the ring buffer. //@param waitStrategy used to determine how to wait for new elements to become available. //@return a constructed ring buffer. public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } //Construct a RingBuffer with the full option set. //@param eventFactory to newInstance entries for filling the RingBuffer //@param sequencer sequencer to handle the ordering of events moving through the RingBuffer. RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { super(eventFactory, sequencer); } ...}
abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7;}
abstract class RingBufferFields<E> extends RingBufferPad { ... private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } ...}
复制代码


SingleProducerSequencer 的 publish()方法在发布事件消息时,首先会设置当前生产者的 Sequence,然后会通过等待策略通知阻塞的消费者。


public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {    ...    //Publish the specified sequence.    //This action marks this particular message as being available to be read.    //@param sequence the sequence to publish.    @Override    public void publish(long sequence) {        sequencer.publish(sequence);    }    ...}
public abstract class AbstractSequencer implements Sequencer { private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); //环形数组的大小 protected final int bufferSize; //等待策略 protected final WaitStrategy waitStrategy; //当前生产者的进度 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler) //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时, //由RingBuffer的addGatingSequences()方法进行添加 protected volatile Sequence[] gatingSequences = new Sequence[0]; ... //Create with the specified buffer size and wait strategy. //@param bufferSize The total number of entries, must be a positive power of 2. //@param waitStrategy public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) { if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.bufferSize = bufferSize; this.waitStrategy = waitStrategy; } ...}
abstract class SingleProducerSequencerPad extends AbstractSequencer { protected long p1, p2, p3, p4, p5, p6, p7; public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); }}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad { public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } //表示生产者的当前序号,值为-1 protected long nextValue = Sequence.INITIAL_VALUE; //表示消费者的最小序号,值为-1 protected long cachedValue = Sequence.INITIAL_VALUE;}
public final class SingleProducerSequencer extends SingleProducerSequencerFields { protected long p1, p2, p3, p4, p5, p6, p7; //Construct a Sequencer with the selected wait strategy and buffer size. //@param bufferSize the size of the buffer that this will sequence over. //@param waitStrategy for those waiting on sequences. public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); }
@Override public void publish(long sequence) { //设置当前生产者的进度,cursor代表了当前生产者的Sequence cursor.set(sequence); //通过等待策略通知阻塞的消费者 waitStrategy.signalAllWhenBlocking(); } @Override public long next() { return next(1); } @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long nextValue = this.nextValue; long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { LockSupport.parkNanos(1L); } this.cachedValue = minSequence; } this.nextValue = nextSequence; return nextSequence; } ...}
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7;}
class Value extends LhsPadding { protected volatile long value;}
class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15;}
//Concurrent sequence class used for tracking the progress of the ring buffer and event processors. //Support a number of concurrent operations including CAS and order writes.//Also attempts to be more efficient with regards to false sharing by adding padding around the volatile field.public class Sequence extends RhsPadding { static final long INITIAL_VALUE = -1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET;
static { UNSAFE = Util.getUnsafe(); VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value")); }
//Create a sequence initialised to -1. public Sequence() { this(INITIAL_VALUE); }
//Create a sequence with a specified initial value. //@param initialValue The initial value for this sequence. public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); }
//Perform a volatile read of this sequence's value. //@return The current value of the sequence. public long get() { return value; }
//Perform an ordered write of this sequence. //The intent is a Store/Store barrier between this write and any previous store. //@param value The new value for the sequence. public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); }
//Performs a volatile write of this sequence. //The intent is a Store/Store barrier between this write and //any previous write and a Store/Load barrier between this write and //any subsequent volatile read. //@param value The new value for the sequence. public void setVolatile(final long value) { UNSAFE.putLongVolatile(this, VALUE_OFFSET, value); }
//Perform a compare and set operation on the sequence. //@param expectedValue The expected current value. //@param newValue The value to update to. //@return true if the operation succeeds, false otherwise. public boolean compareAndSet(final long expectedValue, final long newValue) { return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue); }
//Atomically increment the sequence by one. //@return The value after the increment public long incrementAndGet() { return addAndGet(1L); }
//Atomically add the supplied value. //@param increment The value to add to the sequence. //@return The value after the increment. public long addAndGet(final long increment) { long currentValue; long newValue; do { currentValue = get(); newValue = currentValue + increment; } while (!compareAndSet(currentValue, newValue)); return newValue; }
@Override public String toString() { return Long.toString(get()); }}
复制代码


MultiProducerSequencer 的 publish()方法在发布事件消息时,则会通过 UnSafe 设置 sequence 在 int 数组中对应元素的值。


public final class MultiProducerSequencer extends AbstractSequencer {    private static final Unsafe UNSAFE = Util.getUnsafe();    private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);    private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);    private final int[] availableBuffer;    private final int indexMask;    private final int indexShift;        //Construct a Sequencer with the selected wait strategy and buffer size.    //@param bufferSize   the size of the buffer that this will sequence over.    //@param waitStrategy for those waiting on sequences.    public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) {        super(bufferSize, waitStrategy);        availableBuffer = new int[bufferSize];        indexMask = bufferSize - 1;        indexShift = Util.log2(bufferSize);        initialiseAvailableBuffer();    }        private void initialiseAvailableBuffer() {        for (int i = availableBuffer.length - 1; i != 0; i--) {            setAvailableBufferValue(i, -1);        }        setAvailableBufferValue(0, -1);    }        private void setAvailableBufferValue(int index, int flag) {        long bufferAddress = (index * SCALE) + BASE;        UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);    }
@Override public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } //The below methods work on the availableBuffer flag. //The prime reason is to avoid a shared sequence object between publisher threads. //(Keeping single pointers tracking start and end would require coordination between the threads). //-- Firstly we have the constraint that the delta between the cursor and minimum gating sequence //will never be larger than the buffer size (the code in next/tryNext in the Sequence takes care of that). //-- Given that; take the sequence value and mask off the lower portion of the sequence //as the index into the buffer (indexMask). (aka modulo operator) //-- The upper portion of the sequence becomes the value to check for availability. //ie: it tells us how many times around the ring buffer we've been (aka division) //-- Because we can't wrap without the gating sequences moving forward //(i.e. the minimum gating sequence is effectively our last available position in the buffer), //when we have new data and successfully claimed a slot we can simply write over the top. private void setAvailable(final long sequence) { setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); }
private int calculateIndex(final long sequence) { return ((int) sequence) & indexMask; } private int calculateAvailabilityFlag(final long sequence) { return (int) (sequence >>> indexShift); } @Override public long next() { return next(1); } @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; } ...}
复制代码


(2)通过 Translator 事件转换器发布消息


生产者还可以直接调用 RingBuffer 的 tryPublishEvent()方法来完成发布事件消息到 RingBuffer。该方法首先会调用 Sequencer 接口的 tryNext()方法获取 sequence 序号,然后根据该 sequence 序号从 RingBuffer 的环形数组中获取对应的元素,接着再调用 RingBuffer 的 translateAndPublish()方法将事件消息赋值替换到该元素中,最后调用 Sequencer 接口的 publish()方法设置当前生产者的 sequence 序号来完成事件消息的发布。


abstract class RingBufferPad {    protected long p1, p2, p3, p4, p5, p6, p7;}
abstract class RingBufferFields<E> extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } ...}
public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { //值为-1 public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7;
//Construct a RingBuffer with the full option set. //@param eventFactory to newInstance entries for filling the RingBuffer //@param sequencer sequencer to handle the ordering of events moving through the RingBuffer. RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { super(eventFactory, sequencer); } @Override public boolean tryPublishEvent(EventTranslator<E> translator) { try { final long sequence = sequencer.tryNext(); translateAndPublish(translator, sequence); return true; } catch (InsufficientCapacityException e) { return false; } } private void translateAndPublish(EventTranslator<E> translator, long sequence) { try { translator.translateTo(get(sequence), sequence); } finally { sequencer.publish(sequence); } } //Get the event for a given sequence in the RingBuffer. //This call has 2 uses. //Firstly use this call when publishing to a ring buffer. //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long). //Secondly use this call when consuming data from the ring buffer. //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method. //@param sequence for the event //@return the event for the given sequence @Override public E get(long sequence) { //调用父类RingBufferFields的elementAt()方法 return elementAt(sequence); } ...}
public abstract class AbstractSequencer implements Sequencer { private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences"); //环形数组的大小 protected final int bufferSize; //等待策略 protected final WaitStrategy waitStrategy; //当前生产者的进度 protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler) //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时, //由RingBuffer的addGatingSequences()方法进行添加 protected volatile Sequence[] gatingSequences = new Sequence[0]; ... //Create with the specified buffer size and wait strategy. //@param bufferSize The total number of entries, must be a positive power of 2. //@param waitStrategy public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) { if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.bufferSize = bufferSize; this.waitStrategy = waitStrategy; } ...}
public final class SingleProducerSequencer extends SingleProducerSequencerFields { protected long p1, p2, p3, p4, p5, p6, p7; //Construct a Sequencer with the selected wait strategy and buffer size. //@param bufferSize the size of the buffer that this will sequence over. //@param waitStrategy for those waiting on sequences. public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } ... @Override public long tryNext() throws InsufficientCapacityException { return tryNext(1); }
@Override public long tryNext(int n) throws InsufficientCapacityException { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } if (!hasAvailableCapacity(n, true)) { throw InsufficientCapacityException.INSTANCE; } long nextSequence = this.nextValue += n; return nextSequence; } private boolean hasAvailableCapacity(int requiredCapacity, boolean doStore) { long nextValue = this.nextValue; long wrapPoint = (nextValue + requiredCapacity) - bufferSize; long cachedGatingSequence = this.cachedValue; if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { if (doStore) { cursor.setVolatile(nextValue);//StoreLoad fence } long minSequence = Util.getMinimumSequence(gatingSequences, nextValue); this.cachedValue = minSequence; if (wrapPoint > minSequence) { return false; } } return true; } @Override public void publish(long sequence) { //设置当前生产者的sequence cursor.set(sequence); //通过等待策略通知阻塞的消费者 waitStrategy.signalAllWhenBlocking(); } ...}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad { SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } //表示生产者的当前序号,值为-1 protected long nextValue = Sequence.INITIAL_VALUE; //表示消费者的最小序号,值为-1 protected long cachedValue = Sequence.INITIAL_VALUE;}
abstract class SingleProducerSequencerPad extends AbstractSequencer { protected long p1, p2, p3, p4, p5, p6, p7; SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); }}
public final class MultiProducerSequencer extends AbstractSequencer { ... @Override public long tryNext() throws InsufficientCapacityException { return tryNext(1); }
@Override public long tryNext(int n) throws InsufficientCapacityException { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; if (!hasAvailableCapacity(gatingSequences, n, current)) { throw InsufficientCapacityException.INSTANCE; } } while (!cursor.compareAndSet(current, next)); return next; } private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) { long wrapPoint = (cursorValue + requiredCapacity) - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) { long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue); gatingSequenceCache.set(minSequence); if (wrapPoint > minSequence) { return false; } } return true; } @Override public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } ...}
//Implementations translate (write) data representations into events claimed from the RingBuffer.//When publishing to the RingBuffer, provide an EventTranslator. //The RingBuffer will select the next available event by sequence and provide it to the EventTranslator (which should update the event), //before publishing the sequence update.//@param <T> event implementation storing the data for sharing during exchange or parallel coordination of an event.public interface EventTranslator<T> { //Translate a data representation into fields set in given event //@param event into which the data should be translated. //@param sequence that is assigned to event. void translateTo(T event, long sequence);}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18896162

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Disruptor—核心源码实现分析_Java_不在线第一只蜗牛_InfoQ写作社区