Spring cloud stream【入门介绍】,java 开发实例大全云盘
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1.3 配置文件
配置文件中除了必要的服务名称,端口和 Eureka 的信息外我们还要添加 RabbitMQ 的注册信息
spring.application.name=stream-sender
server.port=9060
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
1.4 创建消费发送者接口
创建一个发送消息的接口。具体如下:方法名称自定义,返回类型必须是 SubscribableChannel,在 Output 注解中指定交换器名称。
/**
发送消息的接口
@author dengp
*/
public interface ISendeService {
/**
指定输出的交换器名称
@return
*/
@Output("dpb-exchange")
SubscribableChannel send();
}
1.5 启动类
在启动类中通过 @EnableBinding 注解绑定我们创建的接口类。
@SpringBootApplication
@EnableEurekaClient
// 绑定我们刚刚创建的发送消息的接口类型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {
public static void main(String[] args) {
SpringApplication.run(StreamSenderStart.class, args);
}
}
2.1 创建项目
2.2 pom 文件
添加的依赖和发送消息的服务是一致的
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.13.RELEASE</version>
</parent>
<groupId>com.bobo</groupId>
<artifactId>stream-receiver</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.3 配置文件
注意修改服务名称和端口
spring.application.name=stream-receiver
server.port=9061
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
2.4 创建接收消息的接口
此接口和发送消息的接口相似,注意使用的是 @Input 注解。
/**
接收消息的接口
@author dengp
*/
public interface IReceiverService {
/**
指定接收的交换器名称
@return
*/
@Input("dpb-exchange")
SubscribableChannel receiver();
}
2.5 创建处理消息的处理类
注意此类并不是实现上面创建的接口。而是通过 @EnableBinding 来绑定我们创建的接口,同时通过 @StreamListener 注解来监听 dpb-exchange 对应的消息服务
/**
具体接收消息的处理类
@author dengp
*/
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {
@StreamListener("dpb-exchange")
public void onReceiver(byte[] msg){
System.out.println("消费者:"+new String(msg));
}
}
2.6 启动类
同样要添加 @EnableBinding 注解
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {
public static void main(String[] args) {
SpringApplication.run(StreamReceiverStart.class, args);
}
}
通过单元测试来测试服务。
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;
import com.bobo.stream.StreamSenderStart;
import com.bobo.stream.sender.ISendeService;
@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
@Autowired
private ISendeService sendService;
@Test
public void testStream(){
String msg = "hello stream ...";
// 将需要发送的消息封装为 Message 对象
Message message = MessageBuilder
.withPayload(msg.getBytes())
.build();
评论