写点什么

Disruptor—核心源码实现分析(三)

  • 2025-05-28
    福建
  • 本文字数:11436 字

    阅读完需:约 38 分钟

4.Disruptor 的高性能原因


一.使用了环形结构 + 数组 + 内存预加载

二.使用了单线程写的方式并配合内存屏障

三.消除伪共享(填充缓存行)

四.序号栅栏和序号配合使用来消除锁

五.提供了多种不同性能的等待策略

 

5.Disruptor 高性能之数据结构(内存预加载机制)


(1)RingBuffer 使用环形数组来存储元素


环形数组可以避免数组扩容和缩容带来的性能损耗。

 

(2)RingBuffer 采用了内存预加载机制


初始化 RingBuffer 时,会将 entries 数组里的每一个元素都先 new 出来。比如 RingBuffer 的大小设置为 8,那么初始化 RingBuffer 时,就会先将 entries 数组的 8 个元素分别指向新 new 出来的空的 Event 对象。往 RingBuffer 填充元素时,只是将对应的 Event 对象进行赋值。所以 RingBuffer 中的 Event 对象是一直存活着的,也就是说它能最小程度减少系统 GC 频率,从而提升性能。


public class Main {    public static void main(String[] args) {        //参数准备        OrderEventFactory orderEventFactory = new OrderEventFactory();        int ringBufferSize = 4;        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());            //参数一:eventFactory,消息(Event)工厂对象        //参数二:ringBufferSize,容器的长度        //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler        //参数四:ProducerType,单生产者还是多生产者        //参数五:waitStrategy,等待策略        //1.实例化Disruptor对象        Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(            orderEventFactory,            ringBufferSize,            executor,            ProducerType.SINGLE,            new BlockingWaitStrategy()        );            //2.添加Event处理器,用于处理事件        //也就是构建Disruptor与消费者的一个关联关系        disruptor.handleEventsWith(new OrderEventHandler());            //3.启动disruptor        disruptor.start();            //4.获取实际存储数据的容器: RingBuffer        RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();        OrderEventProducer producer = new OrderEventProducer(ringBuffer);        ByteBuffer bb = ByteBuffer.allocate(8);        for (long i = 0; i < 5; i++) {            bb.putLong(0, i);            //向容器中投递数据            producer.sendData(bb);        }        disruptor.shutdown();        executor.shutdown();    }}
public class Disruptor<T> { private final RingBuffer<T> ringBuffer; private final Executor executor; ... //Create a new Disruptor. //@param eventFactory the factory to create events in the ring buffer. //@param ringBufferSize the size of the ring buffer, must be power of 2. //@param executor an Executor to execute event processors. //@param producerType the claim strategy to use for the ring buffer. //@param waitStrategy the wait strategy to use for the ring buffer. public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); } //Private constructor helper private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; } ...}
//Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.//@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { //值为-1 public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7; ... //Create a new Ring Buffer with the specified producer type (SINGLE or MULTI) public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { switch (producerType) { case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } } //Create a new single producer RingBuffer with the specified wait strategy. public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); return new RingBuffer<E>(factory, sequencer); } //Construct a RingBuffer with the full option set. //@param eventFactory to newInstance entries for filling the RingBuffer //@param sequencer sequencer to handle the ordering of events moving through the RingBuffer. RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { super(eventFactory, sequencer); } ...}
abstract class RingBufferFields<E> extends RingBufferPad { private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; ... RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { //设置一个空的数据对象 entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } ...}
abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7;}
复制代码


6.Disruptor 高性能之内核(使用单线程写)


Disruptor 的 RingBuffer 之所以可以做到完全无锁是因为单线程写。离开单线程写,没有任何技术可以做到完全无锁。Redis 和 Netty 等高性能技术框架也是利用单线程写来实现的。

 

具体就是:单生产者时,固然只有一个生产者线程在写。多生产者时,每个生产者线程都只会写各自获取到的 Sequence 序号对应的环形数组的元素,从而使得多个生产者线程相互之间不会产生写冲突。

 

7.Disruptor 高性能之系统内存优化(内存屏障)


要正确实现无锁,还需要另外一个关键技术——内存屏障。对应到 Java 语言,就是 valotile 变量与 Happens Before 语义。

 

内存屏障:Linux 的 smp_wmb()/smp_rmb()。

 

8.Disruptor 高性能之系统缓存优化(消除伪共享)


CPU 缓存是以缓存行(Cache Line)为单位进行存储的。缓存行是 2 的整数幂个连续字节,一般为 32-256 个字节,最常见的缓存行大小是 64 个字节。

 

当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会对这个缓存行形成竞争,从而无意中影响彼此性能,这就是伪共享。

 

消除伪共享:利用了空间换时间的思想。

 

由于代表着一个序号的 Sequence 其核心字段 value 是一个 long 型变量(占 8 个字节),所以有可能会出现多个 Sequence 对象的 value 变量共享同一个缓存行。因此,需要对 Sequence 对象的 value 变量消除伪共享。具体做法就是:对 Sequence 对象的 value 变量前后增加 7 个 long 型变量。

 

注意:伪共享与 Sequence 的静态变量无关,因为静态变量本身就是多个线程共享的,而不是多个线程隔离独立的。


class LhsPadding {    protected long p1, p2, p3, p4, p5, p6, p7;}
class Value extends LhsPadding { protected volatile long value;}
class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15;}
public class Sequence extends RhsPadding { static final long INITIAL_VALUE = -1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET;
static { UNSAFE = Util.getUnsafe(); try { VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value")); } catch (final Exception e) { throw new RuntimeException(e); } }
//Create a sequence initialised to -1. public Sequence() { this(INITIAL_VALUE); }
//Create a sequence with a specified initial value. public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); }
//Perform a volatile read of this sequence's value. public long get() { return value; }
//Perform an ordered write of this sequence. //The intent is a Store/Store barrier between this write and any previous store. public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); } ...}
复制代码


9.Disruptor 高性能之序号获取优化(自旋 + CAS)


生产者投递 Event 时会使用"long sequence = ringBuffer.next()"获取序号,而序号栅栏 SequenceBarrier 和会序号 Sequence 搭配起来一起使用,用来协调和管理消费者和生产者的工作节奏,避免锁的使用。

 

各个消费者和生产者都持有自己的序号,这些序号需满足如下条件以避免生产者速度过快,将还没来得及消费的消息覆盖。


一.消费者序号数值必须小于生产者序号数值二.消费者序号数值必须小于其前置消费者的序号数值三.生产者序号数值不能大于消费者中最小的序号数值
复制代码


高性能的序号获取优化:为避免生产者每次执行 next()获取序号时,都要查询消费者的最小序号,Disruptor 采取了自旋 + LockSupport 挂起线程 + 缓存最小序号 + CAS 来优化。既避免了锁,也尽量在不耗费 CPU 的情况下提升了性能。

 

单生产者的情况下,只有一个线程添加元素,此时没必要使用锁。多生产者的情况下,会有多个线程并发获取 Sequence 序号添加元素,此时会通过自旋 + CAS 避免锁。


public class OrderEventProducer {    private RingBuffer<OrderEvent> ringBuffer;        public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {        this.ringBuffer = ringBuffer;    }        public void sendData(ByteBuffer data) {        //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号        long sequence = ringBuffer.next();        try {            //2.根据这个序号, 找到具体的"OrderEvent"元素            //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"            OrderEvent event = ringBuffer.get(sequence);            //3.进行实际的赋值处理            event.setValue(data.getLong(0));        } finally {            //4.提交发布操作            ringBuffer.publish(sequence);        }    }}
//Ring based store of reusable entries containing the data representing an event being exchanged between event producer and EventProcessors.//@param <E> implementation storing the data for sharing during exchange or parallel coordination of an event.public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> { //值为-1 public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE; protected long p1, p2, p3, p4, p5, p6, p7; ... //Increment and return the next sequence for the ring buffer. //Calls of this method should ensure that they always publish the sequence afterward. //E.g. // long sequence = ringBuffer.next(); // try { // Event e = ringBuffer.get(sequence); // ... // } finally { // ringBuffer.publish(sequence); // } //@return The next sequence to publish to. @Override public long next() { return sequencer.next(); } //Publish the specified sequence. //This action marks this particular message as being available to be read. //@param sequence the sequence to publish. @Override public void publish(long sequence) { sequencer.publish(sequence); } //Get the event for a given sequence in the RingBuffer. //This call has 2 uses. //Firstly use this call when publishing to a ring buffer. //After calling RingBuffer#next() use this call to get hold of the preallocated event to fill with data before calling RingBuffer#publish(long). //Secondly use this call when consuming data from the ring buffer. //After calling SequenceBarrier#waitFor(long) call this method with any value greater than that //your current consumer sequence and less than or equal to the value returned from the SequenceBarrier#waitFor(long) method. //@param sequence for the event //@return the event for the given sequence @Override public E get(long sequence) { //调用父类RingBufferFields的elementAt()方法 return elementAt(sequence); } ...}
abstract class RingBufferPad { protected long p1, p2, p3, p4, p5, p6, p7;}
abstract class RingBufferFields<E> extends RingBufferPad { ... private static final Unsafe UNSAFE = Util.getUnsafe(); private final long indexMask; //环形数组存储事件消息 private final Object[] entries; protected final int bufferSize; //RingBuffer的sequencer属性代表了当前线程对应的生产者 protected final Sequencer sequencer; RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; //初始化数组 this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; //内存预加载 fill(eventFactory); } private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < bufferSize; i++) { entries[BUFFER_PAD + i] = eventFactory.newInstance(); } } protected final E elementAt(long sequence) { return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT)); } ...}
复制代码


public abstract class AbstractSequencer implements Sequencer {    private static final AtomicReferenceFieldUpdater<AbstractSequencer, Sequence[]> SEQUENCE_UPDATER =        AtomicReferenceFieldUpdater.newUpdater(AbstractSequencer.class, Sequence[].class, "gatingSequences");    //环形数组的大小    protected final int bufferSize;    //等待策略    protected final WaitStrategy waitStrategy;    //当前生产者的进度    protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);    //每一个Sequence都对应着一个消费者(一个EventHandler或者一个WorkHandler)    //这些Sequence会通过SEQUENCE_UPDATER在执行Disruptor的handleEventsWith()等方法时,    //由RingBuffer的addGatingSequences()方法进行添加    protected volatile Sequence[] gatingSequences = new Sequence[0];    ...        //Create with the specified buffer size and wait strategy.    //@param bufferSize The total number of entries, must be a positive power of 2.    //@param waitStrategy    public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) {        if (bufferSize < 1) {            throw new IllegalArgumentException("bufferSize must not be less than 1");        }        if (Integer.bitCount(bufferSize) != 1) {            throw new IllegalArgumentException("bufferSize must be a power of 2");        }        this.bufferSize = bufferSize;        this.waitStrategy = waitStrategy;    }    ...}
abstract class SingleProducerSequencerPad extends AbstractSequencer { protected long p1, p2, p3, p4, p5, p6, p7; public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); }}
abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad { public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } //表示生产者的当前序号,值为-1 protected long nextValue = Sequence.INITIAL_VALUE; //表示消费者的最小序号,值为-1 protected long cachedValue = Sequence.INITIAL_VALUE;}
public final class SingleProducerSequencer extends SingleProducerSequencerFields { protected long p1, p2, p3, p4, p5, p6, p7; //Construct a Sequencer with the selected wait strategy and buffer size. //@param bufferSize the size of the buffer that this will sequence over. //@param waitStrategy for those waiting on sequences. public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } ... @Override public long next() { return next(1); } @Override public long next(int n) { //Sequence的初始化值为-1 if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } //nextValue指的是当前Sequence //this.nextValue为SingleProducerSequencerFields的变量 //第一次调用next()方法时,nextValue = -1 //第二次调用next()方法时,nextValue = 0 //第三次调用next()方法时,nextValue = 1 //第四次调用next()方法时,nextValue = 2 //第五次调用next()方法时,nextValue = 3 long nextValue = this.nextValue; //第一次调用next()方法时,nextSequence = -1 + 1 = 0 //第二次调用next()方法时,nextSequence = 0 + 1 = 1 //第三次调用next()方法时,nextSequence = 1 + 1 = 2 //第四次调用next()方法时,nextSequence = 2 + 1 = 3 //第五次调用next()方法时,nextSequence = 3 + 1 = 4 long nextSequence = nextValue + n; //wrapPoint会用来判断生产者序号是否绕过RingBuffer的环 //如果wrapPoint是负数,则表示还没绕过RingBuffer的环 //如果wrapPoint是非负数,则表示已经绕过RingBuffer的环 //假设bufferSize = 3,那么: //第一次调用next()方法时,wrapPoint = 0 - 3 = -3,还没绕过RingBuffer的环 //第二次调用next()方法时,wrapPoint = 1 - 3 = -2,还没绕过RingBuffer的环 //第三次调用next()方法时,wrapPoint = 2 - 3 = -1,还没绕过RingBuffer的环 //第四次调用next()方法时,wrapPoint = 3 - 3 = 0,已经绕过RingBuffer的环 //第五次调用next()方法时,wrapPoint = 4 - 3 = 1,已经绕过RingBuffer的环 long wrapPoint = nextSequence - bufferSize; //cachedGatingSequence是用来将消费者的最小消费序号缓存起来 //这样就不用每次执行next()方法都要去获取消费者的最小消费序号 //第一次调用next()方法时,cachedGatingSequence = -1 //第二次调用next()方法时,cachedGatingSequence = -1 //第三次调用next()方法时,cachedGatingSequence = -1 //第四次调用next()方法时,cachedGatingSequence = -1 //第五次调用next()方法时,cachedGatingSequence = 1 long cachedGatingSequence = this.cachedValue; //第四次调用next()方法时,wrapPoint大于cachedGatingSequence,执行条件中的逻辑 if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { //最小的消费者序号 long minSequence; //自旋操作: //Util.getMinimumSequence(gatingSequences, nextValue)的含义就是找到消费者中最小的序号值 //如果wrapPoint > 消费者中最小的序号,那么生产者线程就需要进行阻塞 //即如果生产者序号 > 消费者中最小的序号,那么就挂起并进行自旋操作 //第四次调用next()方法时,nextValue = 2,wrapPoint = 0,gatingSequences里面的消费者序号如果还没消费(即-1),则要挂起 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { //TODO: Use waitStrategy to spin? LockSupport.parkNanos(1L); } //cachedValue接收了消费者的最小序号 //第四次调用next()方法时,假设消费者的最小序号minSequence为1,则cachedValue = 1 this.cachedValue = minSequence; } //第一次调用完next()方法时,nextValue会变为0 //第二次调用完next()方法时,nextValue会变为1 //第三次调用完next()方法时,nextValue会变为2 //第四次调用完next()方法时,nextValue会变为3 //第五次调用完next()方法时,nextValue会变为4 this.nextValue = nextSequence; //第一次调用next()方法时,返回的nextSequence = 0 //第二次调用next()方法时,返回的nextSequence = 1 //第三次调用next()方法时,返回的nextSequence = 2 //第四次调用next()方法时,返回的nextSequence = 3 //第五次调用next()方法时,返回的nextSequence = 4 return nextSequence; } @Override public void publish(long sequence) { //设置当前生产者的sequence cursor.set(sequence); //通过等待策略通知阻塞的消费者 waitStrategy.signalAllWhenBlocking(); } ...}
public final class Util { ... //Get the minimum sequence from an array of {@link com.lmax.disruptor.Sequence}s. //@param sequences to compare. //@param minimum an initial default minimum. If the array is empty this value will be returned. //@return the smaller of minimum sequence value found in sequences and minimum; minimum if sequences is empty public static long getMinimumSequence(final Sequence[] sequences, long minimum) { for (int i = 0, n = sequences.length; i < n; i++) { long value = sequences[i].get(); minimum = Math.min(minimum, value); } return minimum; } ...}
public final class MultiProducerSequencer extends AbstractSequencer { ... @Override public long next() { return next(1); } @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { //获取当前生产者的序号 current = cursor.get(); next = current + n; //wrapPoint会用来判断生产者序号是否绕过RingBuffer的环 //如果wrapPoint是负数,则表示还没绕过RingBuffer的环 //如果wrapPoint是非负数,则表示已经绕过RingBuffer的环 long wrapPoint = next - bufferSize; //cachedGatingSequence是用来将消费者的最小消费序号缓存起来 //这样就不用每次执行next()方法都要去获取消费者的最小消费序号 long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { //gatingSequence表示的是消费者的最小序号 long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { //TODO, should we spin based on the wait strategy? LockSupport.parkNanos(1); continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; } ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18896162

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Disruptor—核心源码实现分析(三)_Java_不在线第一只蜗牛_InfoQ写作社区