写点什么

手把手教你学 Dapr - 7. Actors

  • 2022 年 1 月 19 日
  • 本文字数:7619 字

    阅读完需:约 25 分钟

介绍

Actor 模式将 Actor 描述为最低级别的“计算单元”。换句话说,您在一个独立的单元(称为 actor)中编写代码,该单元接收消息并一次处理一个消息,没有任何并发或线程。


再换句话说,根据ActorId划分独立计算单元后,相同的ActorId重入要排队,可以理解为lock(ActorId)

:这里有个反例,就是重入性的引入,这个概念目前还是 Preview,它允许同一个链内可以重复进入,判断的标准不止是ActorId这么简单,即自己调自己是被允许的。这个默认是关闭的,需要手动开启,即默认不允许自己调自己


当您的代码处理一条消息时,它可以向其他参与者发送一条或多条消息,或者创建新的参与者。底层运行时管理每个参与者运行的方式、时间和地点,并在参与者之间路由消息。


大量的 Actor 可以同时执行,Actor 彼此独立执行。


Dapr 包含一个运行时,它专门实现了 Virtual Actor 模式。 通过 Dapr 的实现,您可以根据 Actor 模型编写 Dapr Actor,而 Dapr 利用底层平台提供的可扩展性和可靠性保证。

什么时候用 Actors

Actor 设计模式非常适合许多分布式系统问题和场景,但您首先应该考虑的是该模式的约束。一般来说,如果出现以下情况,请考虑使用 Actors 模式来为您的问题或场景建模:


  • 您的问题空间涉及大量(数千个或更多)小的、独立且孤立的状态和逻辑单元

  • 您希望使用需要与外部组件进行大量交互的单线程对象,包括跨一组 Actors 查询状态。

  • 您的 Actor 实例会通过发出 I/O 操作来阻塞具有不可预测延迟的调用者。

Dapr Actor

每个 Actor 都被定义为 Actor 类型的实例,就像对象是类的实例一样。 例如,可能有一个执行计算器功能的 Actor 类型,并且可能有许多该类型的 Actor 分布在集群的各个节点上。每个这样的 Actor 都由一个 Acotr ID 唯一标识。


生命周期

Dapr Actors 是虚拟的,这意味着他们的生命周期与他们的内存表现无关。因此,它们不需要显式创建或销毁。Dapr Actors 运行时在第一次收到对该 Actor ID 的请求时会自动激活该 Actor。如果一个 Actor 在一段时间内没有被使用,Dapr Actors 运行时就会对内存中的对象进行垃圾回收。如果稍后需要重新激活,它还将保持对参与者存在的了解。如果稍后需要重新激活,它还将保持对 Actor 的一切原有数据。


调用 Actor 方法和提醒会重置空闲时间,例如提醒触发将使 Actor 保持活跃。无论 Actor 是活跃还是不活跃,Actor 提醒都会触发,如果为不活跃的 Actor 触发,它将首先激活演员。Actor 计时器不会重置空闲时间,因此计时器触发不会使 Actor 保持活动状态。计时器仅在 Actor 处于活动状态时触发。


Reminders 和 Timers 最大的区别就是 Reminders 会保持 Actor 的活动状态,而 Timers 不会


Dapr 运行时用来查看 Actor 是否可以被垃圾回收的空闲超时和扫描间隔是可配置的。当 Dapr 运行时调用 Actor 服务以获取支持的 Actor 类型时,可以传递此信息。


由于 Virtual Actor 模型的存在,这种 Virtual Actor 生命周期抽象带来了一些注意事项,事实上,Dapr Actors 实现有时会偏离这个模型。


第一次将消息发送到 Actor ID 时,Actor 被自动激活(导致构建 Actor 对象)。 经过一段时间后,Actor 对象将被垃圾回收。被回收后再次使用 Actor ID 将导致构造一个新的 Actor 对象。 Actor 的状态比对象的生命周期长,因为状态存储在 Dapr 运行时配置的状态管理组件中。


:Actor 被垃圾回收之前,Actor 对象是会复用的。这里会导致一个问题,在.Net Actor 类中,构造函数在 Actor 存活期间只会被调用一次。

分发和故障转移

为了提供可扩展性和可靠性,Actor 实例分布在整个集群中,Dapr 根据需要自动将它们从故障节点迁移到健康节点。


Actors 分布在 Actor 服务的实例中,而这些实例分布在集群中的节点之间。 对于给定的 Actor 类型,每个服务实例都包含一组 Actor。

Dapr 安置服务(Placement Service)

Dapr Actor 运行时为您管理分发方案和密钥范围设置。这是由 Actor Placement 服务完成的。创建服务的新实例时,相应的 Dapr 运行时会注册它可以创建的 Actor 类型,并且安置服务会计算给定 Actor 类型的所有实例的分区。每个 Actor 类型的分区信息表被更新并存储在环境中运行的每个 Dapr 实例中,并且可以随着 Actor 服务的新实例的创建和销毁而动态变化。这如下图所示:



当客户端调用具有特定 ID 的 Actor(例如,Actor ID 123)时,客户端的 Dapr 实例会 Hash Actor 类型和 ID,并使用该信息调用可以为特定 Actor ID 的请求提供服务的相应 Dapr 实例。因此,始终为任何给定的 Actor ID 调用相同的分区(或服务实例)。这如下图所示:



这简化了一些选择,但也带来了一些考虑:


  • 默认情况下,Actor 随机放置到 pod 中,从而实现均匀分布。

  • 因为 Actor 是随机放置的,应该可以预料到 Actor 操作总是需要网络通信,包括方法调用数据的序列化和反序列化,产生延迟和开销。


:Dapr Actor 放置服务仅用于 Actor 放置,因此如果您的服务不使用 Dapr Actors,则不需要。 放置服务可以在所有托管环境中运行,包括自托管和 Kubernetes。

Actor 通讯

您可以通过 HTTP/gRPC 调用 Actor,当然也可以用 SDK。


POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/<method/state/timers/reminders>
复制代码

并发

Dapr Actor 运行时为访问 Actor 方法提供了一个简单的回合制(turn-basesd)的访问模型。这意味着在任何时候,Actor 对象的代码中都不能有超过一个线程处于活动状态。


单个Actor实例一次不能处理多个请求。如果预期要处理并发请求,Actor 实例可能会导致吞吐量瓶颈。


单个 Actor 实例指每个 Actor ID 对应的 Actor 对象。单个 Actor 不并发就没有问题


如果两个 Actor 之间存在循环请求,而同时向其中一个 Actor 发出外部请求,则 Actor 之间可能会陷入僵局。Dapr Actor 运行时自动超时 Actor 调用并向调用者抛出异常以中断可能的死锁情况。


重入性(Preview)

作为对 dapr 中基础 Actor 的增强。现在重入性为预览功能,感兴趣的小伙伴可以到看官方文档。

回合制访问(Turn-based access)

一个回合包括一个 Actor 方法的完整执行以响应来自其他 Actor 或客户端的请求,或者一个计时器/提醒回调的完整执行。即使这些方法和回调是异步的,Dapr Actor 运行时也不会将它们交叉。一个回合必须完全完成后,才允许进行新的回合。换句话说,当前正在执行的 Actor 方法或计时器/提醒回调必须完全完成,才能允许对方法或回调的新调用。


Dapr Actor 运行时通过在回合开始时获取每个 Actor 的锁并在回合结束时释放锁来实现基于回合的并发性。 因此,基于回合的并发是在每个 Actor 的基础上执行的,而不是跨 Actor。Actor 方法和计时器/提醒回调可以代表不同的 Actor 同时执行。


以下示例说明了上述概念。考虑实现两个异步方法(例如 Method1 和 Method2)、计时器和提醒的 Actor 类型。下图显示了代表属于此 Actor 类型的两个 Actors(ActorId1 和 ActorId2)执行这些方法和回调的时间线示例。


Actor 状态管理

Actor 可以使用状态管理功能可靠地保存状态。您可以通过 HTTP/gRPC 端点与 Dapr 交互以进行状态管理。


要使用 actor,您的状态存储必须支持事务。这意味着您的状态存储组件必须实现 TransactionalStore 接口。只有一个状态存储组件可以用作所有参与者的状态存储。


事务支持列表:https://docs.dapr.io/reference/components-reference/supported-state-stores/


:建议学习的时候都用Redis,官方所有的示例也都是基于 Redis,比较容易上手,且Dapr init默认集成

Actor 计时器和提醒

Actor 可以通过注册计时器或提醒来安排自己的定期工作。


计时器和提醒的功能非常相似。主要区别在于,Dapr Actor 运行时在停用后不保留有关计时器的任何信息,而使用 Dapr Actor 状态提供程序保留有关提醒的信息。


定时器和提醒的调度配置是相同的,总结如下:




DueTime 是一个可选参数,用于设置第一次调用回调之前的时间或时间间隔。如果省略 DueTime,则在定时器/提醒注册后立即调用回调。


支持的格式:


  • RFC3339 日期格式,例如2020-10-02T15:00:00Z

  • time.Duration 格式,例如2h30m

  • ISO 8601 持续时间格式,例如PT2H30M




period 是一个可选参数,用于设置两次连续回调调用之间的时间间隔。当以 ISO 8601-1 持续时间格式指定时,您还可以配置重复次数以限制回调调用的总数。如果省略 period,则回调将仅被调用一次。


支持的格式:


  • time.Duration 格式,例如2h30m

  • ISO 8601 持续时间格式,例如PT2H30M, R5/PT1M30S




ttl 是一个可选参数,用于设置计时器/提醒到期和删除的时间或时间间隔。如果省略 ttl,则不应用任何限制。


支持的格式:


  • RFC3339 日期格式,例如2020-10-02T15:00:00Z

  • time.Duration 格式,例如2h30m

  • ISO 8601 持续时间格式,例如PT2H30M




当您同时指定周期内的重复次数和 ttl 时,计时器/提醒将在满足任一条件时停止。

Actor 运行时配置

  • actorIdleTimeout - 停用空闲 actor 之前的超时时间。每个 actorScanInterval 间隔都会检查超时。默认值:60 分钟

  • actorScanInterval - 指定扫描演员以停用空闲 Actor 的频率的持续时间。闲置时间超过 actor_idle_timeout 的 Actor 将被停用。默认值:30 秒

  • drainOngoingCallTimeout - 在耗尽 Rebalanced 的 Actor 的过程中的持续时间。这指定了当前活动 Actor 方法完成的超时时间。如果当前没有 Actor 方法调用,则忽略此项。默认值:60 秒

  • drainRebalancedActors - 如果为 true,Dapr 将等待 drainOngoingCallTimeout 持续时间以允许当前角色调用完成,然后再尝试停用角色。默认值:true

  • drainRebalancedActors与上面的drainOngoingCallTimeout需搭配使用

  • reentrancy - (ActorReentrancyConfig) - 配置角色的重入行为。如果未提供,则禁用可重入。默认值:disabled, 0

  • remindersStoragePartitions - 配置 Actor 提醒的分区数。如果未提供,则所有提醒都将保存为 Actor 状态存储中的单个记录。默认值:0


// In Startup.cspublic void ConfigureServices(IServiceCollection services){    // Register actor runtime with DI    services.AddActors(options =>    {        // Register actor types and configure actor settings        options.Actors.RegisterActor<MyActor>();
// Configure default settings options.ActorIdleTimeout = TimeSpan.FromMinutes(60); options.ActorScanInterval = TimeSpan.FromSeconds(30); options.DrainOngoingCallTimeout = TimeSpan.FromSeconds(60); options.DrainRebalancedActors = true; options.RemindersStoragePartitions = 7; // reentrancy not implemented in the .NET SDK at this time });
// Register additional services for use with actors services.AddSingleton<BankService>();}
复制代码

分区提醒(Preview)

在 sidecar 重新启动后,Actor 提醒会保留并继续触发。在 Dapr 运行时版本 1.3 之前,提醒被保存在 actor 状态存储中的单个记录上。


此为 Preview 功能,感兴趣可以看官方文档

.Net 调用 Dapr 的 Actor

与以往不同,Actor 示例会多创建一个共享类库用于存放 Server 和 Client 共用的部分

创建 Assignment.Shared

创建类库项目,并添加Dapr.ActorsNuGet 包引用,最后添加以下几个类:


AccountBalance.cs


namespace Assignment.Shared;public class AccountBalance{    public string AccountId { get; set; } = default!;
public decimal Balance { get; set; }}
复制代码


IBankActor.cs


:这个是 Actor 接口,IActor 是 Dapr SDK 提供的


using Dapr.Actors;
namespace Assignment.Shared;public interface IBankActor : IActor{ Task<AccountBalance> GetAccountBalance();
Task Withdraw(WithdrawRequest withdraw);}
复制代码


OverdraftException.cs


namespace Assignment.Shared;public class OverdraftException : Exception{    public OverdraftException(decimal balance, decimal amount)        : base($"Your current balance is {balance:c} - that's not enough to withdraw {amount:c}.")    {    }}
复制代码


WithdrawRequest.cs


namespace Assignment.Shared;public class WithdrawRequest{    public decimal Amount { get; set; }}
复制代码

创建 Assignment.Server

创建类库项目,并添加Dapr.Actors.AspNetCoreNuGet 包引用和Assignment.Shared项目引用,最后修改程序端口为 5000。


:Server 与 Shared 和 Client 的 NuGet 包不一样,Server 是集成了服务端的一些功能


修改 program.cs


var builder = WebApplication.CreateBuilder(args);builder.Services.AddSingleton<BankService>();builder.Services.AddActors(options =>{    options.Actors.RegisterActor<DemoActor>();});
var app = builder.Build();
app.UseRouting();
app.UseEndpoints(endpoints =>{ endpoints.MapActorsHandlers();});
app.Run();
复制代码


添加 BankService.cs


using Assignment.Shared;
namespace Assignment.Server;public class BankService{ // Allow overdraft of up to 50 (of whatever currency). private readonly decimal OverdraftThreshold = -50m;
public decimal Withdraw(decimal balance, decimal amount) { // Imagine putting some complex auditing logic here in addition to the basics.
var updated = balance - amount; if (updated < OverdraftThreshold) { throw new OverdraftException(balance, amount); }
return updated; }}
复制代码


添加 BankActor.cs


using Assignment.Shared;using Dapr.Actors.Runtime;using System;
namespace Assignment.Server;public class BankActor : Actor, IBankActor, IRemindable // IRemindable is not required{ private readonly BankService bank;
public BankActor(ActorHost host, BankService bank) : base(host) { // BankService is provided by dependency injection. // See Program.cs this.bank = bank; }
public async Task<AccountBalance> GetAccountBalance() { var starting = new AccountBalance() { AccountId = this.Id.GetId(), Balance = 10m, // Start new accounts with 100, we're pretty generous. };
var balance = await StateManager.GetOrAddStateAsync("balance", starting); return balance; }
public async Task Withdraw(WithdrawRequest withdraw) { var starting = new AccountBalance() { AccountId = this.Id.GetId(), Balance = 10m, // Start new accounts with 100, we're pretty generous. };
var balance = await StateManager.GetOrAddStateAsync("balance", starting)!;
if (balance.Balance <= 0) { // Simulated reminder deposit if (Random.Shared.Next(100) > 90) { await RegisterReminderAsync("Deposit", null, TimeSpan.FromSeconds(5), TimeSpan.FromMilliseconds(-1)); } }
// Throws Overdraft exception if the account doesn't have enough money. var updated = this.bank.Withdraw(balance.Balance, withdraw.Amount);
balance.Balance = updated; await StateManager.SetStateAsync("balance", balance); }
public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period) { if (reminderName == "Deposit") { var balance = await StateManager.GetStateAsync<AccountBalance>("balance")!;
if (balance.Balance <= 0) { balance.Balance += 60; // 50(Overdraft Threshold) + 10 = 60 Console.WriteLine("Deposit: 10"); } else { Console.WriteLine("Deposit: ignore"); } } }}
复制代码

运行 Assignment.Server

使用 Dapr CLI 来启动,先使用命令行工具跳转到目录 dapr-study-room\Assignment07\Assignment.Server,然后执行下面命令


dapr run --app-id testactor --app-port 5000 --dapr-http-port 3500 --dapr-grpc-port 50001 dotnet run
复制代码

创建 Assignment.Client

创建控制台项目,并添加Dapr.ActorsNuGet 包引用和Assignment.Shared项目引用。


修改 Program.cs


using Assignment.Shared;using Dapr.Actors;using Dapr.Actors.Client;
Console.WriteLine("Creating a Bank Actor");var bank = ActorProxy.Create<IBankActor>(ActorId.CreateRandom(), "BankActor");Parallel.ForEach(Enumerable.Range(1, 10), async i =>{ while (true) { var balance = await bank.GetAccountBalance(); Console.WriteLine($"[Worker-{i}] Balance for account '{balance.AccountId}' is '{balance.Balance:c}'.");
Console.WriteLine($"[Worker-{i}] Withdrawing '{1m:c}'..."); try { await bank.Withdraw(new WithdrawRequest() { Amount = 1m }); } catch (ActorMethodInvocationException ex) { Console.WriteLine("[Worker-{i}] Overdraft: " + ex.Message); }
Task.Delay(1000).Wait(); }});
Console.ReadKey();
复制代码

运行 Assignment.Client

使用 Dapr CLI 来启动,先使用命令行工具跳转到目录 dapr-study-room\Assignment07\Assignment.Client,然后执行下面命令


dotnet run
复制代码

本章源码

Assignment07


https://github.com/doddgu/dapr-study-room

我们正在行动,新的框架、新的生态

我们的目标是自由的易用的可塑性强的功能丰富的健壮的


所以我们借鉴 Building blocks 的设计理念,正在做一个新的框架MASA Framework,它有哪些特点呢?


  • 原生支持 Dapr,且允许将 Dapr 替换成传统通信方式

  • 架构不限,单体应用、SOA、微服务都支持

  • 支持.Net 原生框架,降低学习负担,除特定领域必须引入的概念,坚持不造新轮子

  • 丰富的生态支持,除了框架以外还有组件库、权限中心、配置中心、故障排查中心、报警中心等一系列产品

  • 核心代码库的单元测试覆盖率 90%+

  • 开源、免费、社区驱动

  • 还有什么?我们在等你,一起来讨论


经过几个月的生产项目实践,已完成 POC,目前正在把之前的积累重构到新的开源项目中


目前源码已开始同步到 Github(文档站点在规划中,会慢慢完善起来):


MASA.BuildingBlocks


MASA.Contrib


MASA.Utils


MASA.EShop


BlazorComponent


MASA.Blazor


QQ 群:7424099


微信群:加技术运营微信(MasaStackTechOps),备注来意,邀请进群


MASA 技术团队:(鬼谷子)

发布于: 刚刚阅读数: 2
用户头像

还未添加个人签名 2021.10.26 加入

还未添加个人简介

评论

发布
暂无评论
手把手教你学Dapr - 7. Actors