写点什么

disruptor 笔记之一:快速入门

  • 2022 年 4 月 24 日
  • 本文字数:4050 字

    阅读完需:约 13 分钟

作为《disruptor 笔记》系列的开篇,本篇有两个任务:


  • 创建名为 disruptor-tutorials 的 gradle 工程,作为整个系列的父工程,该系列所有代码都是这个父工程下的 module;

  • 在 disruptor-tutorials 下面新建名为 basic-event 的 module,这是个 springboot 应用,作用是使用 disruptor 的基本功能:一个线程发布事件,另一个线程消费事件,也就是对环形队列最基本的操作,如下图:


[](()用 disruptor 实现消息的发布和消费的套路

  • 咱们提前小结用 disruptor 实现消息的发布和消费的套路,后面的开发按部就班即可,括号中是本篇对应的 java 类:


  1. 事件的定义:一个普通的 bean(StringEvent.j 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 ava)

  2. 事件工厂:定义如何生产事件的内存实例,这个实例刚从内存中创建,还没有任何业务数据(StringEventFactory.java)

  3. 事件处理:封装了消费单个事件的具体逻辑(StringEventHandler.java)

  4. 事件生产者:定义了如何将业务数据设置到还没有业务数据的事件中,就是工厂创建出来的那种(StringEventProducer.java)

  5. 初始化逻辑:创建和启动 disruptor 对象,将事件工厂传给 disruptor,创建事件生产者和事件处理对象,并分别与 disruptor 对象关联;

  6. 业务逻辑:也就是调用事件生产者的 onData 方法发布事件,本文的做法是在单元测试类中发布事件,然后检查消费的事件数和生产的事件数是否一致;

[](()环境信息

《Disruptor 笔记》系列涉及的环境信息如下:


  1. 操作系统:64 位 win10

  2. JDK:1.8.0_281

  3. IDE:IntelliJ IDEA 2021.1.1 (Ultimate Edition)

  4. gradle:6.7.1

  5. springboot:2.3.8.RELEASE

  6. disruptor:3.4.4

[](()源码下载

  • 本篇实战中的完整源码可在 GitHub 下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):


| 名称 | 链接 | 备注 |


| :-- | :-- | :-- |


| 项目主页 | https://github.com/zq2599/blog_demos | 该项目在 GitHub 上的主页 |


| git 仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https 协议 |


| git 仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh 协议 |


  • 这个 git 项目中有多个文件夹,本次实战的源码在 disruptor-tutorials 文件夹下,如下图红框所示:


[](()创建父工程

  • 因为是系列文章,所以这里做个父工程来管理所有依赖库和插件,新建名为 disruptor-tutorials 的 gradle 工程,build.gradle 如下:


import java.time.OffsetDateTime


import java.time.format.DateTimeFormatter


buildscript {


repositories {


maven {


url 'https://plugins.gradle.org/m2/'


}


// 如果有私服就在此配置,如果没有请注释掉


maven {


url 'http://192.168.50.43:8081/repository/aliyun-proxy/'


}


// 阿里云


maven {


url 'http://maven.aliyun.com/nexus/content/groups/public/'


}


mavenCentral()


}


ext {


// 项目版本


projectVersion = '1.0-SNAPSHOT'


// sprignboot 版本 https://github.com/spring-projects/spring-boot/releases


springBootVersion = '2.3.8.RELEASE'


}


}


plugins {


id 'java'


id 'java-library'


id 'org.springframework.boot' version "${springBootVersion}" apply false


id 'io.spring.dependency-management' version '1.0.11.RELEASE'


id 'net.nemerosa.versioning' version '2.14.0'


id 'io.franzbecker.gradle-lombok' version '4.0.0' apply false


id 'com.github.ben-manes.versions' version '0.36.0' // gradle dependencyUpdates


}


// If you attempt to build without the --scan parameter in gradle 6.0+ it will cause a build error that it can't find


// a buildScan property to change. This avoids that problem.


if (hasProperty('buildScan')) {


buildScan {


termsOfServiceUrl = 'https://gradle.com/terms-of-service'


termsOfServiceAgree = 'yes'


}


}


wrapper {


gradleVersion = '6.7.1'


}


def buildTimeAndDate = OffsetDateTime.now()


ext {


// 构建时取得当前日期和时间


buildDate = DateTimeFormatter.ISO_LOCAL_DATE.format(buildTimeAndDate)


buildTime = DateTimeFormatter.ofPattern('HH:mm:ss.SSSZ').format(buildTimeAndDate)


buildRevision = versioning.info.commit


}


allprojects {


apply plugin: 'java'


apply plugin: 'idea'


apply plugin: 'eclipse'


apply plugin: 'io.spring.dependency-management'


apply plugin: 'io.franzbecker.gradle-lombok'


compileJava {


sourceCompatibility = JavaVersion.VERSION_1_8


targetCompatibility = JavaVersion.VERSION_1_8


options.encoding = 'UTF-8'


}


compileJava.options*.compilerArgs = [


'-Xlint:all', '-Xlint:-processing'


]


// Copy LICENSE


tasks.withType(Jar) {


from(project.rootDir) {


include 'LICENSE'


into 'META-INF'


}


}


// 写入到 MANIFEST.MF 中的内容


jar {


manifest {


attributes(


'Created-By': "{System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(),


'Built-By': 'travis',


'Build-Date': buildDate,


'Build-Time': buildTime,


'Built-OS': "${System.properties['os.name']}",


'Build-Revision': buildRevision,


'Specification-Title': project.name,


'Specification-Version': projectVersion,


'Specification-Vendor': 'Will Zhao',


'Implementation-Title': project.name,


'Implementation-Version': projectVersion,


'Implementation-Vendor': 'Will Zhao'


)


}


}


repositories {


mavenCentral()


// 如果有私服就在此配置,如果没有请注释掉


maven {


url 'http://192.168.50.43:8081/repository/aliyun-proxy/'


}


// 阿里云


maven {


url 'http://maven.aliyun.com/nexus/content/groups/public/'


}


jcenter()


}


buildscript {


repositories {


maven { url 'https://plugins.gradle.org/m2/' }


}


}


}


allprojects { project ->


buildscript {


dependencyManagement {


imports {


mavenBom "org.springframework.boot:spring-boot-starter-parent:${springBootVersion}"


mavenBom "org.junit:junit-bom:5.7.0"


}


dependencies {


dependency 'org.projectlombok:lombok:1.16.16'


dependency 'org.apache.commons:commons-lang3:3.11'


dependency 'commons-collections:commons-collections:3.2.2'


dependency 'com.lmax:disruptor:3.4.4'


}


}


ext {


springFrameworkVersion = dependencyManagement.importedProperties['spring-framework.version']


}


}


}


group = 'bolingcavalry'


version = projectVersion


  • 接下来编写消息发布和消费的代码;

[](()新建 module

  • 前面新建了整个《Disruptor 笔记》系列的父工程,现在新建名为 basic-event 的 module,其 build.gradle 内容如下:


plugins {


id 'org.springframework.boot'


}


dependencies {


implementation 'org.projectlombok:lombok'


implementation 'org.springframework.boot:spring-boot-starter'


implementation 'org.springframework.boot:spring-boot-starter-web'


implementation 'com.lmax:disruptor'


testImplementation('org.springframework.boot:spring-boot-starter-test')


}


  • 这个 module 是个 springboot 应用,启动类如下:


package com.bolingcavalry;


import org.springframework.boot.SpringApplication;


import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication


public class BasicEventApplication {


public static void main(String[] args) {


SpringApplication.run(BasicEventApplication.class, args);


}


}


  • 接下来按照前面总结的套路行事;

[](()事件的定义

  • 事件定义类 StringEvent.java,可见就是个普普通通的 java bean:


package com.bolingcavalry.service;


import lombok.Data;


import lombok.NoArgsConstructor;


import lombok.ToString;


@Data


@ToString


@NoArgsConstructor


public class StringEvent {


private String value;


}

[](()事件工厂

  • 事件工厂的作用,是让 disruptor 知道如何在内存中创建一个事件实例,不过,该实例和业务还没有任何关系,本篇的事件工厂如下,可见就是创建 StringEvent 实例,并没有特别的操作:


package com.bolingcavalry.service;


import com.lmax.disruptor.EventFactory;


public class StringEventFactory implements EventFactory<StringEvent> {


@Override


public StringEvent newInstance() {


return new StringEvent();


}


}

[](()事件处理

  • 时间处理类的作用是定义一个事件如何被消费,里面是具体的业务代码,每个事件都会执行此类的 onEvent 方法;

  • 本篇的事件处理类做的事情是打印事件内容,再用 sleep 消耗 100 毫秒,然后再调用外部传入的 Consumer 实现类的 accept 方法:


package com.bolingcavalry.service;


import com.lmax.disruptor.EventHandler;


import lombok.Setter;


import lombok.extern.slf4j.Slf4j;


import java.util.function.Consumer;


@Slf4j


public class StringEventHandler implements EventHandler<StringEvent> {


public StringEventHandler(Consumer<?> consumer) {


this.consumer = consumer;


}


// 外部可以传入 Consumer 实现类,每处理一条消息的时候,consumer 的 accept 方法就会被执行一次


private Consumer<?> consumer;


@Override


public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {


log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);


// 这里延时 100ms,模拟消费事件的逻辑的耗时


Thread.sleep(100);


// 如果外部传入了 consumer,就要执行一次 accept 方法


if (null!=consumer) {


consumer.accept(null);


}


}


}

[](()事件生产者

  • 每当业务要生产一个事件时,就会调用事件生产者的 onData 方法,将业务数据作为入参传进来,此时生产者会从环形队列中取出一个事件实例(就是前面的事件工厂创建的),把业务数据传给这个实例,再把实例正式发布出去:


package com.bolingcavalry.service;


import com.lmax.disruptor.RingBuffer;

用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
disruptor笔记之一:快速入门_Java_爱好编程进阶_InfoQ写作社区