写点什么

无锁并发框架 -Disruptor 的使用(二)

  • 2021 年 11 月 12 日
  • 本文字数:1801 字

    阅读完需:约 6 分钟

  • 18


1.4、定义事件消费者


package com.zhz.disruptor;


import com.lmax.disruptor.EventHandler;


import com.zhz.disruptor.event.LongEvent;


import lombok.extern.slf4j.Slf4j;


/**


  • @author :zhz

  • @date :Created in 2020/12/30

  • @version: V1.0

  • @slogan: 天下风云出我辈,一入代码岁月催

  • @description: 事件消费者


**/


@Slf4j


public class LongEventHandler implements EventHandler<LongEvent> {


private long serial = 0;


public LongEventHandler(long serial){


this.serial = serial;


}


@Override


public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {


log.info("消费者-{}:{}",this.serial,event.getValue());


}


}


  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

  • 14

  • 15

  • 16

  • 17

  • 18

  • 19

  • 20

  • 21

  • 22

  • 23

  • 24

  • 25

  • 26

  • 27


1.5、定义生产者


package com.zhz.disruptor;


import com.lmax.disruptor.RingBuffer;


import com.zhz.disruptor.event.LongEvent;


import lombok.extern.slf4j.Slf4j;


import java.nio.ByteBuffer;


/**


  • @author :zhz

  • @date :Created in 2020/12/30

  • @version: V1.0

  • @slogan: 天下风云出我辈,一入代码岁月催

  • @description: 事件生产者


**/


@Slf4j


public class LongEventProducer {


public final RingBuffer<LongEvent> ringBuffer;


public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {


this.ringBuffer = ringBuffer;


}


public void onData(ByteBuffer byteBuffer) {


// 1.ringBuffer 事件队列 下一个槽


long sequence = ringBuffer.next();


Long data = null;


try {


//2.取出空的事件队列


LongEvent event = ringBuffer.get(sequence);


//3.获取事件队列传递的数据


data = byteBuffer.getLong(0);


event.setValue(data);


try {


Thread.sleep(10);


} catch (InterruptedException e) {


e.printStackTrace();


}


} finally {


log.info("生产者准备发送数据");


//4.发布事件,


//注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;


// 如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。


ringBuffer.publish(sequence);


}


}


}


  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

  • 14

  • 15

  • 16

  • 17

  • 18

  • 19

  • 20

  • 21

  • 22

  • 23

  • 24

  • 25

  • 26

  • 27

  • 28

  • 29

  • 30

  • 31

  • 32

  • 33

  • 34

  • 35

  • 36

  • 37

  • 38

  • 39

  • 40

  • 41

  • 42

  • 43

  • 44

  • 45

  • 46

  • 47

  • 48


1.6、定义 Main 入口


package com.zhz.disruptor;


import com.lmax.disruptor.EventFactory;


import com.lmax.disruptor.RingBuffer;


import com.lmax.disruptor.YieldingWaitStrategy;


import com.lmax.disruptor.dsl.Disruptor;


import com.lmax.disruptor.dsl.ProducerType;


import com.zhz.disruptor.event.LongEvent;


import com.zhz.disruptor.event.LongEventFactory;


import java.nio.ByteBuffer;


import java.util.concurrent.ExecutorService;


import java.util.concurrent.Executors;


/**


  • @author :zhz

  • @date :Created in 2020/12/30

  • @version: V1.0

  • @slogan: 天下风云出我辈,一入代码岁月催

  • @description:


**/


public class DisruptorMain {


public static void main(String[] args) {


// 1.创建一个可缓存的线程 提供线程来出发 Consumer 的事件处理


ExecutorService executor = Executors.newCachedThreadPool();


// 2.创建工厂


EventFactory eventFactory = new LongEventFactory();


// 3.创建 ringBuffer 大小,ringBufferSize 大小一定要是 2 的 N 次方


int ringBufferSize = 1024;


// 4.创建 Disruptor


Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());


// 5.连接消费端方法


disruptor.handleEventsWith(new LongEventHandler(1),new LongEventHandler(2));


// 6.启动


disruptor.start();


// 7.创建 RingBuffer 容器


RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();


// 8.创建生产者


LongEventProducer producer = new LongEventProducer(ringBuffer);


// 9.指定缓冲区大小


ByteBuffer byteBuffer = ByteBuffer.allocate(8);


for (int i = 1; i <= 10; i++) {


byteBuffer.putLong(0,i);


producer


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


.onData(byteBuffer);


}


//10.关闭 disruptor 和 executor


disruptor.shutdown();


executor.shutdown();


}


}


  • 1

  • 2

  • 3

  • 4

  • 5

  • 6

  • 7

  • 8

  • 9

  • 10

  • 11

  • 12

  • 13

  • 14

  • 15

  • 16

  • 17

  • 18

  • 19

  • 20

  • 21

  • 22

  • 23

  • 24

  • 25

  • 26

  • 27

  • 28

  • 29

  • 30

  • 31

  • 32

  • 33

  • 34

  • 35

  • 36

  • 37

  • 38

  • 39

  • 40

  • 41

  • 42

  • 43

  • 44

  • 45

  • 46

  • 47

  • 48

  • 49

  • 50

  • 51

  • 点赞

  • 评论

  • 分享

  • 收藏

  • 打赏

  • 举报

  • 已关注

评论

发布
暂无评论
无锁并发框架-Disruptor的使用(二)