写点什么

Spring cloud stream【入门介绍】,java 开发实例大全云盘

  • 2021 年 11 月 10 日
  • 本文字数:2313 字

    阅读完需:约 8 分钟

<plugins>


<plugin>


<groupId>org.springframework.boot</groupId>


<artifactId>spring-boot-maven-plugin</artifactId>


</plugin>


</plugins>


</build>


</project>

1.3 配置文件

配置文件中除了必要的服务名称端口Eureka 的信息外我们还要添加 RabbitMQ 的注册信息


spring.application.name=stream-sender


server.port=9060


#设置服务注册中心地址,指向另一个注册中心


eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/


#rebbitmq 链接信息


spring.rabbitmq.host=192.168.88.150


spring.rabbitmq.port=5672


spring.rabbitmq.username=dpb


spring.rabbitmq.password=123


spring.rabbitmq.virtualHost=/

1.4 创建消费发送者接口

创建一个发送消息的接口。具体如下:方法名称自定义,返回类型必须是 SubscribableChannel,在 Output 注解中指定交换器名称。


/**


  • 发送消息的接口

  • @author dengp


*/


public interface ISendeService {


/**


  • 指定输出的交换器名称

  • @return


*/


@Output("dpb-exchange")


SubscribableChannel send();


}

1.5 启动类

在启动类中通过 @EnableBinding 注解绑定我们创建的接口类。


@SpringBootApplication


@EnableEurekaClient


// 绑定我们刚刚创建的发送消息的接口类型


@EnableBinding(value={ISendeService.class})


public class StreamSenderStart {


public static void main(String[] args) {


SpringApplication.run(StreamSenderStart.class, args);


}


}


2.创建消息消费者服务



2.1 创建项目

2.2 pom 文件

添加的依赖和发送消息的服务是一致的


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"


xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">


<modelVersion>4.0.0</modelVersion>


<parent>


<groupId>org.springframework.boot</groupId>


<artifactId>spring-boot-starter-parent</artifactId>


<version>1.5.13.RELEASE</version>


</parent>


<groupId>com.bobo</groupId>


<artifactId>stream-receiver</artifactId>


<version>0.0.1-SNAPSHOT</version>


<dependencyManagement>


<dependencies>


<dependency>


<groupId>org.springframework.cloud</groupId>


<artifactId>spring-cloud-dependencies</artifactId>


<version>Dalston.SR5</version>


<type>pom</type>


<scope>import</scope>


</dependency>


</dependencies>


</dependencyManagement>


<dependencies>


<dependency>


<groupId>org.springframework.boot</groupId>


<artifactId>spring-boot-starter-web</artifactId>


</dependency>


<dependency>


<groupId>org.springframework.cloud</groupId>


<artifactId>spring-cloud-starter-eureka</artifactId>


</dependency>


<dependency>


<groupId>org.springframework.cloud</groupId>


<artifactId>spring-cloud-starter-stream-rabbit</artifactId>


</dependency>


</dependencies>


<build>


<plugins>


<plugin>


<groupId>org.springframework.boot</groupId>


<artifactId>spring-boot-maven-plugin</artifactId>


</plugin>


</plugins>


</build>


</project>

2.3 配置文件

注意修改服务名称和端口


spring.application.name=stream-receiver


server.port=9061


#设置服务注册中心地址,指向另一个注册中心


eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/


#rebbitmq 链接信息


spring.rabbitmq.host=192.168.88.150


spring.rabbitmq.port=5672


spring.rabbitmq.username=dpb


spring.rabbitmq.password=123


spring.rabbitmq.virtualHost=/

2.4 创建接收消息的接口

此接口和发送消息的接口相似,注意使用的是 @Input 注解。


/**


  • 接收消息的接口

  • @author dengp


*/


public interface IReceiverService {


/**


  • 指定接收的交换器名称

  • @return


*/


@Input("dpb-exchange")


SubscribableChannel receiver();


}

2.5 创建处理消息的处理类

注意此类并不是实现上面创建的接口。而是通过 @EnableBinding 来绑定我们创建的接口,同时通过 @StreamListener 注解来监听 dpb-exchange 对应的消息服务


/**


  • 具体接收消息的处理类

  • @author dengp


*/


@Service


@EnableBinding(IReceiverService.class)


public class ReceiverService {


@StreamListener("dpb-exchange")


public void onReceiver(byte[] msg){


System.out.println("消费者:"+new String(msg));


}


}

2.6 启动类

同样要添加 @EnableBinding 注解


@SpringBootApplication


@EnableEurekaClient


@EnableBinding(value={IReceiverService.class})


public class StreamReceiverStart {


public static void main(String[] args) {


SpringApplication.run(StreamReceiverStart.class, args);


}


}


3.编写测试代码




通过单元测试来测试服务。


import org.junit.Test;


import org.junit.runner.RunWith;


import org.springframework.beans.factory.annotation.Autowired;


import org.springframework.boot.test.context.SpringBootTest;


import org.springframework.messaging.Message;


import org.springframework.messaging.support.MessageBuilder;


import org.springframework.test.context.junit4.SpringRunner;


import com.bobo.stream.StreamSenderStart;


import com.bobo.stream.sender.ISendeService;


@RunWith(SpringRunner.class)


@SpringBootTest(classes=StreamSenderStart.class)


public class StreamTest {


@Autowired


private ISendeService sendService;


@Test


public void testStream(){


String msg = "hello stream ...";


// 将需要发送的消息封装为 Message 对象


Message message = MessageBuilder


.withPayload(msg.getBytes())


.build();

评论

发布
暂无评论
Spring cloud stream【入门介绍】,java开发实例大全云盘