写点什么

disruptor 笔记之一:快速入门

作者:Java高工P7
  • 2021 年 11 月 10 日
  • 本文字数:4479 字

    阅读完需:约 15 分钟

// Copy LICENSE


tasks.withType(Jar) {


from(project.rootDir) {


include 'LICENSE'


into 'META-INF'


}


}


// 写入到 MANIFEST.MF 中的内容


jar {


manifest {


attributes(


'Created-By': "{System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(),


'Built-By': 'travis',


'Build-Date': buildDate,


'Build-Time': buildTime,


'Built-OS': "${System.properties['os.name']}",


'Build-Revision': buildRevision,


'Specification-Title': project.name,


'Specification-Version': projectVersion,


'Specification-Vendor': 'Will Zhao',


'Implementation-Title': project.name,


'Implementation-Version': projectVersion,


'Implementation-Vendor': 'Will Zhao'


)


}


}


repositories {


mavenCentral()


// 如果有私服就在此配置,如果没有请注释掉


maven {


url 'http://192.168.50.43:8081/repository/aliyun-proxy/'


}


// 阿里云


maven {


url 'http://maven.aliyun.com/nexus/content/groups/public/'


}


jcenter()


}


buildscript {


repositories {


maven { url 'https://plugins.gradle.org/m2/' }


}


}


}


allprojects { project ->


buildscript {


dependencyManagement {


imports {


mavenBom "org.springframework.boot:spring-boot-starter-parent:${springBootVersion}"


mavenBom "org.junit:junit-bom:5.7.0"


}


dependencies {


dependency 'org.projectlombok:lombok:1.16.16'


dependency 'org.apache.commons:commons-lang3:3.11'


dependency 'commons-collections:commons-collections:3.2.2'


dependency 'com.lmax:disruptor:3.4.4'


}


}


ext {


springFrameworkVersion = dependencyManagement.importedProperties['spring-framework.version']


}


}


}


group = 'bolingcavalry'


version = projectVersion


  • 接下来编写消息发布和消费的代码;

新建 module

  • 前面新建了整个《Disruptor 笔记》系列的父工程,现在新建名为 basic-event 的 module,其 build.gradle 内容如下:


plugins {


id 'org.springframework.boot'


}


dependencies {


implementation 'org.projectlombok:lombok'


implementation 'org.springframework.boot:spring-boot-starter'


implementation 'org.springframework.boot:spring-boot-starter-web'


implementation 'com.lmax:disruptor'


testImplementation('org.springframework.boot:spring-boot-starter-test')


}


  • 这个 module 是个 springboot 应用,启动类如下:


package com.bolingcavalry;


import org.springframework.boot.SpringApplication;


import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication


public class BasicEventApplication {


public static void main(String[] args) {


SpringApplication.run(BasicEventApplication.class, args);


}


}


  • 接下来按照前面总结的套路行事;

事件的定义

  • 事件定义类 StringEvent.java,可见就是个普普通通的 java bean:


package com.bolingcavalry.service;


import lombok.Data;


import lombok.NoArgsConstructor;


import lombok.ToString;


@Data


@ToString


@NoArgsConstructor


public class StringEvent {


private String value;


}

事件工厂

  • 事件工厂的作用,是让 disruptor 知道如何在内存中创建一个事件实例,不过,该实例和业务还没有任何关系,本篇的事件工厂如下,可见就是创建 StringEvent 实例,并没有特别的操作:


package com.bolingcavalry.service;


import com.lmax.disruptor.EventFactory;


public class StringEventFactory implements EventFactory<StringEvent> {


@Override


public StringEvent newInstance() {


return new StringEvent();


}


}

事件处理

  • 时间处理类的作用是定义一个事件如何被消费,里面是具体的业务代码,每个事件都会执行此类的 onEvent 方法;

  • 本篇的事件处理类做的事情是打印事件内容,再用 sleep 消耗 100 毫秒,然后再调用外部传入的 Consumer 实现类的 accept 方法:


package com.bolingcavalry.service;


import com.lmax.disruptor.EventHandler;


import lombok.Setter;


import lombok.extern.slf4j.Slf4j;


import java.util.function.Consumer;


@Slf4j


public class StringEventHandler implements EventHandler<StringEvent> {


public StringEventHandler(Consumer<?> consumer) {


this.consumer = consumer;


}


// 外部可以传入 Consumer 实现类,每处理一条消息的时候,consumer 的 accept 方法就会被执行一次


private Consumer<?> consumer;


@Override


public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {


log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);


// 这里延时 100ms,模拟消费事件的逻辑的耗时


Thread.sleep(100);


// 如果外部传入了 consumer,就要执行一次 accept 方法


if (null!=consumer) {


consumer.accept(null);


}


}


}

事件生产者

  • 每当业务要生产一个事件时,就会调用事件生产者的 onData 方法,将业务数据作为入参传进来,此时生产者会从环形队列中取出一个事件实例(就是前面的事件工厂创建的),把业务数据传给这个实例,再把实例正式发布出去:


package com.bolingcavalry.service;


import com.lmax.disruptor.RingBuffer;


public class StringEventProducer {


// 存储数据的环形队列


private final RingBuffer<StringEvent> ringBuffer;


public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {


this.ringBuffer = ringBuffer;


}


public void onData(String content) {


// ringBuffer 是个队列,其 next 方法返回的是下最后一条记录之后的位置,这是个可用位置


long sequence = ringBuffer.next();


try {


// sequence 位置取出的事件是空事件


StringEvent stringEvent = ringBuffer.get(sequence);


// 空事件添加业务信息


stringEvent.setValue(content);


} finally {


// 发布


ringBuffer.publish(sequence);


}


}


}

初始化逻辑

  • 开发一个 spring bean,这里面有 disruptor 的初始化逻辑,有几处需要关注的地方稍后会说到:


package com.bolingcavalry.service.impl;


import com.bolingcavalry.service.*;


import com.lmax.disruptor.RingBuffer;


import com.lmax.disruptor.dsl.Disruptor;


import com.lmax.disruptor.util.DaemonThreadFactory;


import lombok.extern.slf4j.Slf4j;


import org.springframework.scheduling.concurrent.CustomizableThreadFactory;


import org.springframework.stereotype.Service;


import javax.annotation.PostConstruct;


import java.time.LocalDateTime;


import java.util.concurrent.Executor;


import java.util.concurrent.Executors;


import java.util.concurrent.ThreadFactory;


import java.util.concurrent.atomic.AtomicInteger;


import java.util.concurrent.atomic.AtomicLong;


import java.util.function.Consumer;


@Service


@Slf4j


public class BasicEventServiceImpl implements BasicEventService {


private static final int BUFFER_SIZE = 16;


private Disruptor<StringEvent> disruptor;


private StringEventProducer producer;


/**


  • 统计消息总数


*/


private final AtomicLong eventCount = new AtomicLong();


@PostConstruct


private void init() {


Executor executor = Executors.newCachedThreadPool();


// 实例化


disruptor = new Disruptor<>(new StringEventFactory(),


BUFFER_SIZE,


new CustomizableThreadFactory("event-handler-"));


// 准备一个匿名类,传给 disruptor 的事件处理类,


// 这样每次处理事件时,都会将已经处理事件的总数打印出来


Consumer<?> eventCountPrinter = new Consumer<Object>() {


@Override


public void accept(Object o) {


long count = eventCount.incrementAndGet();


log.info("receive [{}] event", count);


}


};


// 指定处理类


disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));


// 启动


disruptor.start();


// 生产者


producer = new StringEventProducer(disruptor.getRingBuffer());


}


@Override


public void publish(String value) {


producer.onData(value);


}


@Override


public long eventCount() {


return eventCount.get();


}


}


  • 上述代码有以下几点需要注意:


  1. publish 方法给外部调用,用于发布一个事件;

  2. eventCountPrinter 是 Consumer 的实现类,被传给了 StringEventHandler,这样 StringEventHandler 消费消息的时候,eventCount 就会增加,也就记下了已经处理的事件总数;

  3. Disruptor 的构造方法中,BUFFER_SIZE 表示环形队列的大小,这里故意设置为 16,这样可以轻易的将环形队列填满,此时再发布事件会不会导致环形队列上的数据被覆盖呢?稍后咱们可以测一下;

  4. 记得调用 start 方法;

web 接口

再写一个 web 接口类,这样就可以通过浏览器验证前面的代码了:


package com.bolingcavalry.controller;


import com.bolingcavalry.service.BasicEventService;


import org.springframework.beans.factory.annotation.Autowired;


import org.springframework.boot.SpringApplication;


import org.springframework.boot.autoconfigure.SpringBootApplication;


import org.springframework.http.HttpStatus;


import org.springframework.web.bind.annotation.*;


import java.time.LocalDateTime;


@RestController


public class BasicEventController {


@Autowired


BasicEventService basicEventService;


@RequestMapping(value = "/{value}", method = RequestMethod.GET)


public String publish(@PathVariable("value") String va


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


lue) {


basicEventService.publish(value);


return "success, " + LocalDateTime.now().toString();


}


}

业务逻辑

  • 现在生产事件的接口已准备好,消费事件的代码也完成了,接下来就是如何调用生产事件的接口来验证生产和消费是否正常,这里我选择使用单元测试来验证;

  • 在 disruptor-tutorials\basic-event\src\test\java 目录下新增测试类 BasicEventServiceImplTest.java,测试逻辑是发布了一百个事件,再验证消费事件的数量是否也等于一百:


package com.bolingcavalry.service.impl;


import com.bolingcavalry.service.BasicEventService;


import lombok.extern.slf4j.Slf4j;


import org.junit.Test;


import org.junit.runner.RunWith;


import org.springframework.beans.factory.annotation.Autowired;


import org.springframework.boot.test.context.SpringBootTest;


import org.springframework.test.context.junit4.SpringRunner;


import static org.junit.Assert.assertEquals;


@RunWith(SpringRunner.class)


@SpringBootTest


@Slf4j


public class BasicEventServiceImplTest {


@Autowired


BasicEventService basicEventService;


@Test


public void publish() throws InterruptedException {


log.info("start publich test");


int count = 100;


for(int i=0;i<count;i++) {


log.info("publich {}", i);


basicEventService.publish(String.valueOf(i));

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
disruptor笔记之一:快速入门