写点什么

手把手教你学 Dapr - 6. 发布订阅

  • 2022 年 1 月 19 日
  • 本文字数:5040 字

    阅读完需:约 17 分钟

介绍

发布/订阅模式允许微服务使用消息相互通信。生产者或发布者在不知道哪个应用程序将接收它们的情况下向主题发送消息。这涉及将它们写入输入通道。同样,消费者或订阅者订阅该主题并接收其消息,而不知道是什么服务产生了这些消息。这涉及从输出通道接收消息。中间消息代理负责将每条消息从输入通道复制到所有对该消息感兴趣的订阅者的输出通道。当您需要将微服务彼此分离时,这种模式特别有用。


Dapr 中的发布/订阅 API 提供至少一次(at-least-once)的保证,并与各种消息代理和队列系统集成。 您的服务所使用的特定实现是可插入的,并被配置为运行时的 Dapr Pub/Sub 组件。 这种方法消除了您服务的依赖性,从而使您的服务可以更便携,更灵活地适应更改。



Dapr 发布/订阅构建块提供了一个与平台无关的 API 来发送和接收消息。您的服务将消息发布到命名主题,并订阅主题以使用这些消息。


下图显示了一个“shipping”服务和一个“email”服务的例子,它们都订阅了“cart”服务发布的主题。每个服务都会加载指向同一发布/订阅消息总线组件的发布/订阅组件配置文件,例如 Redis Streams、NATS Streaming、Azure Service Bus 或 GCP Pub/Sub。



下图具有相同的服务,但是这次显示的是 Dapr 发布 API,它发送“订单”主题和订阅服务上的订单端点,这些主题消息由 Dapr 发布到。


特性

Cloud Events 消息格式

为了启用消息路由并为每条消息提供额外的上下文,Dapr 使用 CloudEvents 1.0 规范作为其消息格式。应用程序使用 Dapr 发送到主题的任何消息都会自动“包装”在 Cloud Events 信封中,使用 datacontenttype 属性的 Content-Type 标头值。


Dapr 实现了以下 Cloud Events 字段:


  • id

  • source

  • specversion

  • type

  • datacontenttype (Optional)


以下示例显示了 CloudEvent v1.0 中序列化为 JSON 的 XML 内容:


{    "specversion" : "1.0",    "type" : "xml.message",    "source" : "https://example.com/message",    "subject" : "Test XML Message",    "id" : "id-1234-5678-9101",    "time" : "2020-09-23T06:23:21Z",    "datacontenttype" : "text/xml",    "data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"}
复制代码

消息订阅

Dapr 应用程序可以订阅已发布的主题。 Dapr 允许您的应用程序订阅主题的两种方法:


  • 声明式,其中订阅在外部文件中定义

  • 程序化,在用户代码中定义订阅

消息传递

原则上,当订阅者在处理消息后以非错误响应进行响应时,Dapr 认为消息已成功传递。为了进行更精细的控制,Dapr 的发布/订阅 API 还提供了在响应负载中定义的显式状态,订阅者可以使用这些状态向 Dapr 指示特定的处理指令(例如 RETRY 或 DROP)。如果两个不同的应用程序(不同的 app-ID)订阅了同一个主题,Dapr 将每条消息只传递给每个应用程序的一个实例。


主题范围

默认情况下,所有支持 Dapr 发布/订阅组件(例如 Kafka、Redis Stream、RabbitMQ)的主题都可用于配置了该组件的每个应用程序。为了限制哪个应用程序可以发布或订阅主题,Dapr 提供了主题范围。这使您能够允许应用程序发布哪些主题以及允许应用程序订阅哪些主题。

消息生存时间(TTL)

Dapr 可以在每条消息的基础上设置超时消息,这意味着如果没有从 pub/sub 组件读取消息,则该消息将被丢弃。这是为了防止堆积未读的消息。在队列中比配置的 TTL 时间长的消息称为死消息。


:也可以在组件创建时为给定队列设置消息 TTL。

与不使用 Dapr 和 CloudEvents 的应用程序通信

对于一个应用程序使用 Dapr 而另一个应用程序不使用的场景,可以为发布者或订阅者禁用 CloudEvent 包装。这允许在不能一次全部采用 Dapr 的应用程序中部分采用 Dapr 发布订阅。

使用.Net 调用 Dapr 的发布订阅

以下示例创建应用程序来发布和订阅名为 deathStarStatus 的主题


先决条件

运行 dapr init 时,Redis Streams 默认安装在本地机器上。如果是 dapr init --slim 需要自己动手操作一些东西了,这里就不演示了。


通过打开您的组件文件进行验证 %UserProfile%\.dapr\components\pubsub.yaml


apiVersion: dapr.io/v1alpha1kind: Componentmetadata:  name: pubsubspec:  type: pubsub.redis  version: v1  metadata:  - name: redisHost    value: localhost:6379  - name: redisPassword    value: ""
复制代码


:这里 redisHost 的 value 可以根据实际情况设置,比如某云的 redis 实例等。又有人问过是否可以切换默认 db,当然可以,name 设置redisDB,value 设置为你要使用的 db 即可


如果你要更详细的 yaml 配置参数,比如并发设置、最大重试次数等等都可以看这里 https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-redis-pubsub/

订阅主题

订阅主题有两种方式:声明式、程序化

声明式订阅

您可以使用以下自定义资源定义 (CRD) 订阅主题。创建一个名为 subscription.yaml 的文件并粘贴以下内容:


apiVersion: dapr.io/v1alpha1kind: Subscriptionmetadata:  name: myevent-subscriptionspec:  topic: deathStarStatus  route: /dsstatus  pubsubname: pubsubscopes:- app1- app2
复制代码


上面的示例显示了对 pubsub 组件 pubsub 主题 deathStarStatus 的事件订阅。


将 CRD 文件放到组件目录即可,这里不继续展开说了。

程序化订阅

Dapr 实例在启动时调用您的应用程序并期待主题订阅的 JSON 响应:


  • pubsubname:Dapr 应该使用哪个 pub/sub 组件

  • topic:订阅哪个主题

  • route:当消息涉及该主题时,Dapr 调用哪个 endpoint


:你可能会觉得,这是不是很麻烦?是的,所以我们用 dapr dotnet-sdk 来帮助我们自动完成这些事情

.Net 订阅

以上是让 dapr sidecar 知道这个消息的订阅最终给谁。但我们的程序里要怎么写呢?


如果你选择的是声明式订阅,你做一个 route 即可,而如果是程序化订阅则不需要多写一个 yaml 文件,且通过特性即可支持,接下来看看.Net SDK 怎么做的吧。

创建 Assignment.Server(Sub)

创建ASP.NET Core 空项目,同时根据之前的文章内容添加Dapr.AspNetCoreNuGet 包和修改程序端口为 5000


修改 program.cs 代码


using Microsoft.AspNetCore.Mvc;
var builder = WebApplication.CreateBuilder(args);var app = builder.Build();
app.UseRouting();app.UseCloudEvents();app.UseEndpoints(endpoints =>{ endpoints.MapSubscribeHandler();});
app.MapPost("/dsstatus", ([FromBody] string word) => Console.WriteLine($"Hello {word}!")).WithTopic("pubsub", "deathStarStatus");
app.Run();
复制代码


:为了告诉 Dapr 消息已成功处理,请返回 200 OK 响应。如果 Dapr 收到除 200 之外的任何其他返回状态代码,或者如果您的应用程序崩溃,Dapr 将尝试按照 At-Least-Once 语义重新传递消息。

运行 Assignment.Server

使用 Dapr CLI 来启动,先使用命令行工具跳转到目录 dapr-study-room\Assignment06\Assignment.Server,然后执行下面命令


dapr run --app-id testpubsub --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run
复制代码

创建 Assignment.Client(Publish)

创建控制台项目,并修改 program.cs


var client = new Dapr.Client.DaprClientBuilder().Build();await client.PublishEventAsync<string>("pubsub", "deathStarStatus", "World");
复制代码


运行 Assignment.Client 即可看到 Assignment.Server 中会打印Hello World!

将消息路由到不同的事件处理程序

基于内容的路由是一种使用 DSL 而不是命令式应用程序代码的消息传递模式。PubSub 路由是此模式的一种实现,它允许开发人员使用表达式根据 CloudEvents 的内容将其路由到应用程序中的不同 URI/路径和事件处理程序。


:这是个预览功能,如果你感兴趣可自行尝试,值得一提的是 Common Expression Language (CEL)很有趣,这里就只贴一段代码看看吧。


        [Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)]        [HttpPost("widgets")]        public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient)        {            // Logic            return stock;        }
[Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)] [HttpPost("gadgets")] public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient) { // Logic return stock; }
[Topic("pubsub", "inventory")] [HttpPost("products")] public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient) { // Logic return stock; }
复制代码

发布/订阅主题访问权限

命名空间或组件范围可用于限制对特定应用程序的组件访问。添加到组件的这些应用程序范围仅限制具有特定 ID 的应用程序能够使用该组件。


除了这个通用组件范围之外,发布/订阅组件还可以限制以下内容:


  • 可以使用哪些主题(已发布或已订阅)

  • 允许哪些应用程序发布到特定主题

  • 允许哪些应用程序订阅特定主题

主题访问权限

apiVersion: dapr.io/v1alpha1kind: Componentmetadata:  name: pubsub  namespace: defaultspec:  type: pubsub.redis  version: v1  metadata:  - name: redisHost    value: "localhost:6379"  - name: redisPassword    value: ""  - name: publishingScopes    value: "app1=topic1;app2=topic2,topic3;app3="  - name: subscriptionScopes    value: "app2=;app3=topic1"
复制代码


下表显示了允许哪些应用程序发布到主题中:

下表显示了允许哪些应用程序订阅主题:


:如果应用程序未列出(例如 subscriptionScopes 中的 app1),则允许订阅所有主题。由于未使用 allowedTopics 且 app1 没有任何订阅范围,因此它还可以使用上面未列出的其他主题。

限制允许的主题

如果 Dapr 应用程序向其发送消息,则会创建一个主题。在某些情况下,应该控制这个主题的创建。例如:


  • 在 Dapr 应用程序中,在生成主题名称时出现的错误可能导致创建无限数量的主题

  • 精简主题名称和总数,防止主题无限增长


apiVersion: dapr.io/v1alpha1kind: Componentmetadata:  name: pubsub  namespace: defaultspec:  type: pubsub.redis  version: v1  metadata:  - name: redisHost    value: "localhost:6379"  - name: redisPassword    value: ""  - name: allowedTopics    value: "topic1,topic2,topic3"
复制代码

结合 allowedTopics 和 scopes

有时您希望组合这两个作用域,因此只允许一组固定的主题,并将作用域指定给特定的应用程序。


apiVersion: dapr.io/v1alpha1kind: Componentmetadata:  name: pubsub  namespace: defaultspec:  type: pubsub.redis  version: v1  metadata:  - name: redisHost    value: "localhost:6379"  - name: redisPassword    value: ""  - name: allowedTopics    value: "A,B"  - name: publishingScopes    value: "app1=A"  - name: subscriptionScopes    value: "app1=;app2=A"
复制代码


:第三个应用程序没有列出,因为如果一个应用程序没有在范围内指定,它是允许使用所有主题的。


下表显示了允许发布到主题的应用程序:

下表显示了哪个应用程序可以订阅主题:

消息 TTL

同状态管理,使用 metadatattlInSeconds

本章源码

Assignment06


https://github.com/doddgu/dapr-study-room

我们正在行动,新的框架、新的生态

我们的目标是自由的易用的可塑性强的功能丰富的健壮的


所以我们借鉴 Building blocks 的设计理念,正在做一个新的框架MASA Framework,它有哪些特点呢?


  • 原生支持 Dapr,且允许将 Dapr 替换成传统通信方式

  • 架构不限,单体应用、SOA、微服务都支持

  • 支持.Net 原生框架,降低学习负担,除特定领域必须引入的概念,坚持不造新轮子

  • 丰富的生态支持,除了框架以外还有组件库、权限中心、配置中心、故障排查中心、报警中心等一系列产品

  • 核心代码库的单元测试覆盖率 90%+

  • 开源、免费、社区驱动

  • 还有什么?我们在等你,一起来讨论


经过几个月的生产项目实践,已完成 POC,目前正在把之前的积累重构到新的开源项目中


目前源码已开始同步到 Github(文档站点在规划中,会慢慢完善起来):


MASA.BuildingBlocks


MASA.Contrib


MASA.Utils


MASA.EShop


BlazorComponent


MASA.Blazor


QQ 群:7424099


微信群:加技术运营微信(MasaStackTechOps),备注来意,邀请进群


MASA 技术团队:(鬼谷子)

发布于: 刚刚阅读数: 2
用户头像

还未添加个人签名 2021.10.26 加入

还未添加个人简介

评论

发布
暂无评论
手把手教你学Dapr - 6. 发布订阅