写点什么

Spring Cloud Stream 编程模型的基础知识,很多老司机都不知道

  • 2021 年 11 月 10 日
  • 本文字数:3382 字

    阅读完需:约 11 分钟


order。setStatus (OrderStatus . REJECTED) ;



return order;



? ? ?}



}


有条件地使用消息


========


假设开发人员希望以不同方式处理传入同一消息通道的消息,则可以使用条件分派。Spring Cloud Stream 支持根据条件将消息分派给在输入通道上注册的多个 @StreamListener 方法。该条件是在 @StreamListener 注解的 condition 属性中定义的 Spring 表达式语言(Spring Expression Language, SpEL) 表达式。


public boolean send(Order order) {



Message<Order> orderMessage =



MessageBuilder . withPayload (order) .build() ;



orde rMessage . getHeaders () . put ("processor", "account") ;



return this. source. output ()。send (orderMessage) ;



}


以下就是一个示例实现,它定义了两个使用 @StreamListener 注解的方法,这些方法侦听同一主题。其中一个专用于从 account-ervice 服务传入的消息,而第二个则专用于 product-service 服务。传入消息将根据带有 processor 名称的标头进行分派。


@SpringBootApplication



@EnableDi scoveryClient



@EnableBinding (Processor.class)



public class OrderApplication {



@StreamListener (target = Processor . INPUT, condition =



"headersI'processor']=-'account'")



public void receiveorder (Order order) throws JsonProcessingException



LOGGER. info ("Order received from account: { } ",



mapper.writeValueAsString(order) );



? ? ? ?// ...



}



@StreamListener (target 一 Processor. INPUT, condition =



"headers[ 'processor']--'product'")



public void receiveOrder (Order order) throws JsonProcess ingException



LOGGER. info("Order received from product: { }",



mapper . writeValueAsString (order) );



? ? ? ? ?// ...



? ? }??



}


使用 Apache Kafka


==============


在讨论 Spring Cloud 与消息代理的集成时,我们曾经多次提到过 Apache Kafka。 但是,到目前为止,我们还没有基于该平台运行任何示例。事实上, RabbitMQ 在使用 SpringCloud 项目时往往是首选,但是 Kafka 也值得我们关注。与 RabbitMQ 相比,它的一个优势是对分区的原生支持,而分区正是 Spring Cloud Stream 最重要的功能之一。


Kaf


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


ka 不是典型的消息代理。它是一个分布式流媒体平台。它的主要功能是允许开发人员发布和订阅记录(Record)流。它对转换或响应数据流的实时流应用程序特别有用。它通常作为由一个或多个服务器组成的集群运行,并可以在主题中存储记录流。


运行 Kafka


=======


糟糕的是,Apache Kafka 没有正式的 Docker 镜像。但是,我们可以使用一个非官方的,如 Spotify 共享的镜像。与其他可用的 Kafka docker 镜像相比,这个镜像可以在同一容器中运行 Zookeeper 和 Kafka。以下是启动 Kafka 并在端口 9092.上公开它的 Docker 命令。在端口 2181 上也可以使用 Zookeeper.


docker run -d --name kafka -P 2181:2181 -P 9092:9092 --env



ADVERTISED HOST=192 .168.99.100 --env ADVERTISED PORT=9092 spotify/kafka


自定义应用程序设置


=========


要为应用程序启用 Apache Kafka, 需要将 spring- cloud-starter- stream-kafka 启动程序包含在依赖项中。我们当前的示例非常类似于发布/订阅的示例,因为它使用了 RabbitMQ 发布/订阅,以及在前一篇“发布/订阅模型”中介绍过的分组和分区机制。唯一的区别在于依赖项和配置设置。


SpringCloudStream 将自动检测并使用类路径中找到的绑定器。可以使用 spring kafka.*属性覆盖连接设置。在这种情况下中,只需要将自动配置的 Kafka 客户端地址更改为 Docker 机器地址 192.168.99.100。 对 Zookeeper 也应该执行相同的修改,因为 Zookeeper 将由 Kafka 客户端使用。


spring:



application:



name: order-service



kafka:



bootstrap-servers: 192.168.99.100:9092



cloud:



stream:



bindings:



output :



destination: orders-out



producer:



partitionKeyExpression: payload. customerId



partitionCount: 2



input:



destination: orders-in



kafka:



binder:



zkNodes: 192.168.99.100


在启动发现、网关和所有必需的微服务实例后,即可执行与先前示例相同的测试。如果一切配置正确,则应该在应用程序启动期间在日志中看到以下片段。其测试结果与基于 RabbitMQ 的示例完全相同。


16:58:30.008 INFO [,] Discovered coordinator 192.168. 99.100:9092



(id: 2147483647 rack: null) for group account.



16:58:30.038 INFO [,] Successfully joined group account with generation 1



16:58:30.039 INFO [,] Setting newly assigned partitions



[orders-out-0, orders-out-1] for group account



16:58:30.081 INFO [,] partitions assigned:



[orders-out-0, orders-out-1]


Kafka Streams API 支持


====================


Spring Cloud Stream Kafka 可以提供专为 Kafka Streams 绑定设计的绑定器。使用此绑定器之后,应用程序即可利用 Kafka Streams API。要为应用程序启用此类功能,需要在项目中包含以下依赖项。


<dependency>



<groupId>org . springframework. cloud</groupId>



<artifactId>spring-cloud-stream-binder- kstream</artifactId>



</dependency>


Kafka Streams API 可以提供高级流 DSL。可以通过声明 @StreamListener 方法将 KStream 接口作为参数来访问它。KStream 为流操作提供了一些有用的方法,这些方法来自其他流 API,如 map、flatMap、join 或 filter。还有一些 与 Kafka Stream 相关的其他方法,如 t.(... (用于向主题发送流)或 through..(与 to 相同,但也会从主题创建 KStream 的新实例) 。


@SpringBootApplication



@EnableBinding (KStreamProcessor .class)



public class AccountApplication {



@StreamListener ("input")



@SendTo ("output")



public KStream<?, order> process (KStream<?, Order> input) {



? ? // ..



}



public static void main(String[] args) {



SpringApplication. run (AccountApplication.class, args);



? ? }



}


配置属性


====


在讨论示例应用程序的实现之前,我们已经介绍了 Kafka 的一些 Spring Cloud 配置设置。表 11.5 包含了一些最重要的属性,可以设置这些属性来自定义 Apache Kafka 绑定器。所有这些属性都以 spring .cloud.stream. kafka.binder 为前缀。



多个绑定器


=====


在 Spring Cloud Stream 术语中,可以实现以提供与外部中间件的物理目标的连接的接口称为绑定器(Binder)。目前,有两种可用的内置绑定器实现一 Kafka 和 RabbitMQ。如果想要提供自定义绑定器库,那么关键接口就是 Binder (这个关键接口其实就是作为将输入和输出连接到外部中间件的策略的抽象),它有两个方法一 bindConsumer 和 bindProducer。有关更多详细信息,请参阅 Spring Cloud Stream 规范。


对开发人员来说,重要的是能够在单个应用程序中使用多个绑定器。我们甚至可以混合使用不同的实现,如 RabbitMQ 和 Kafka. Spring Cloud Stream 依赖于 Spring Boot 在绑定过程中的自动配置。类路径上可用的实现将自动使用。如果想要同时使用默认的绑定器,请在项目中包含以下依赖项。


<dependency>



? <groupId>org。spr ingframework. cloud</groupId>



? <artifactId>spring-cloud-st ream-binder- rabbit</artifactId>



</ dependency>



<dependency>



<groupId>org. spr ingframework. cloud</groupId>



?<artifactId> spring-cloud-stream-binder- kafka</artifactId>



</dependency>


如果在类路径中找到了多个绑定器,则应用程序必须检测应将哪个绑定器用于特定通道绑定。我们可以使用 spring .cloud. stream defaultBinder 属性全局配置默认绑定器,或者使用 spring.cloud stream. bindings.<channelName> .binder 属性为每个通道单独配置默认绑定器。现在不妨回到之前的示例,在那里配置多个绑定器。我们需要为 account-service 服务和 order. service 服务之间的直接通信定义 RabbitMQ,并为 order-service 服务和其他微服务之间的发布/订阅模型定义 Kafka。


以下是与 publish subscribe 分支( hts://github.?com/piomin/sample-spring-cloud-messaging/tree/publish subscribe) 中的 account-service 相同的配置,但它基于两个不同的绑定器。


spring:



cloud:



stream:



bindings:



output:



destination: orders-in



binder: rabbit1



input :



consumer :



partitioned: true



destination: orders -out



binder: kafkal



group:. account



rabbit:



bindings:



output:



producer :



exchangeType: direct



rout ingKeyExpression: ' “#”'

评论

发布
暂无评论
Spring Cloud Stream 编程模型的基础知识,很多老司机都不知道