写点什么

Disruptor 高性能堆内队列 系列一

作者:Nick
  • 2022 年 6 月 06 日
  • 本文字数:1511 字

    阅读完需:约 5 分钟

Disruptor 高性能堆内队列 系列一

本文使用「署名 4.0 国际 (CC BY 4.0)」许可协议,欢迎转载、或重新修改使用,但需要注明来源。 署名 4.0 国际 (CC BY 4.0)

本文作者: Nicksxs

创建时间: 2022-02-03

本文链接: Disruptor 系列一


很久之前就听说过这个框架,不过之前有点跟消息队列混起来,这个也是种队列,但不是跟 rocketmq,nsq 那种一样的,而是在进程内部提供队列服务的,偏向于取代ArrayBlockingQueue,因为这个阻塞队列是使用了锁来控制阻塞,关于并发其实有一些通用的最佳实践,就是用锁,即使是 JDK 提供的锁,也是比较耗资源的,当然这是跟不加锁的对比,同样是锁,JDK 的实现还是性能比较优秀的。常见的阻塞队列中例如 ArrayBlockingQueueLinkedBlockingQueue 都有锁的身影的存在,区别在于 ArrayBlockingQueue 是一把锁,后者是两把锁,不过重点不在几把锁,这里其实是两个问题,一个是所谓的 lock free, 对于一个单生产者的 disruptor 来说,因为写入是只有一个线程的,是可以不用加锁,多生产者的时候使用的是 cas 来获取对应的写入坑位,另一个是解决“伪共享”问题,后面可以详细点分析,先介绍下使用首先是数据源


public class LongEvent {    private long value;
public void set(long value) { this.value = value; }
public long getValue() { return value; }
public void setValue(long value) { this.value = value; }}
复制代码


事件生产


public class LongEventFactory implements EventFactory<LongEvent>{    public LongEvent newInstance()    {        return new LongEvent();    }}
复制代码


事件处理器


public class LongEventHandler implements EventHandler<LongEvent> {
// event 事件, // sequence 当前的序列 // 是否当前批次最后一个数据 public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { String str = String.format("long event : %s l:%s b:%s", event.getValue(), sequence, endOfBatch); System.out.println(str); }}
复制代码


主方法代码


package disruptor;
import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class LongEventMain{ public static void main(String[] args) throws Exception { // 这个需要是 2 的幂次,这样在定位的时候只需要位移操作,也能减少各种计算操作 int bufferSize = 1024;
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// 类似于注册处理器 disruptor.handleEventsWith(new LongEventHandler()); // 或者直接用 lambda disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event)); // 启动我们的 disruptor disruptor.start();

RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); // 生产事件 ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb); Thread.sleep(1000); } }}
复制代码


运行下可以看到运行结果



这里其实就只是最简单的使用,生产者只有一个,然后也不是批量的。

发布于: 刚刚阅读数: 4
用户头像

Nick

关注

还未添加个人签名 2017.12.22 加入

写代码的阿森 https://nicksxs.me https://nicksxs.com 也可以看我的博客

评论

发布
暂无评论
Disruptor 高性能堆内队列 系列一_Java_Nick_InfoQ写作社区