无锁并发框架 -Disruptor 的使用(二)
18
package com.zhz.disruptor;
import com.lmax.disruptor.EventHandler;
import com.zhz.disruptor.event.LongEvent;
import lombok.extern.slf4j.Slf4j;
/**
@author :zhz
@date :Created in 2020/12/30
@version: V1.0
@slogan: 天下风云出我辈,一入代码岁月催
@description: 事件消费者
**/
@Slf4j
public class LongEventHandler implements EventHandler<LongEvent> {
private long serial = 0;
public LongEventHandler(long serial){
this.serial = serial;
}
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
log.info("消费者-{}:{}",this.serial,event.getValue());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.zhz.disruptor;
import com.lmax.disruptor.RingBuffer;
import com.zhz.disruptor.event.LongEvent;
import lombok.extern.slf4j.Slf4j;
import java.nio.ByteBuffer;
/**
@author :zhz
@date :Created in 2020/12/30
@version: V1.0
@slogan: 天下风云出我辈,一入代码岁月催
@description: 事件生产者
**/
@Slf4j
public class LongEventProducer {
public final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 1.ringBuffer 事件队列 下一个槽
long sequence = ringBuffer.next();
Long data = null;
try {
//2.取出空的事件队列
LongEvent event = ringBuffer.get(sequence);
//3.获取事件队列传递的数据
data = byteBuffer.getLong(0);
event.setValue(data);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
log.info("生产者准备发送数据");
//4.发布事件,
//注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;
// 如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
ringBuffer.publish(sequence);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.zhz.disruptor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.zhz.disruptor.event.LongEvent;
import com.zhz.disruptor.event.LongEventFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
@author :zhz
@date :Created in 2020/12/30
@version: V1.0
@slogan: 天下风云出我辈,一入代码岁月催
@description:
**/
public class DisruptorMain {
public static void main(String[] args) {
// 1.创建一个可缓存的线程 提供线程来出发 Consumer 的事件处理
ExecutorService executor = Executors.newCachedThreadPool();
// 2.创建工厂
EventFactory eventFactory = new LongEventFactory();
// 3.创建 ringBuffer 大小,ringBufferSize 大小一定要是 2 的 N 次方
int ringBufferSize = 1024;
// 4.创建 Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 5.连接消费端方法
disruptor.handleEventsWith(new LongEventHandler(1),new LongEventHandler(2));
// 6.启动
disruptor.start();
// 7.创建 RingBuffer 容器
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 8.创建生产者
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9.指定缓冲区大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i <= 10; i++) {
byteBuffer.putLong(0,i);
producer
.onData(byteBuffer);
}
//10.关闭 disruptor 和 executor
disruptor.shutdown();
executor.shutdown();
}
}
评论