什么是触发器
可以直接调用流程引擎的 IWorkflowRuntime 获取 IWorkflowClient,然后调用它的 CreateAndRunInstanceAsync 来启动一个全新的流程。
也可以让流程引擎监听一个事件,当事件触发时,自动创建并执行(或借助书签恢复)一个流程实例,这就是触发器。比如 定义一个触发器,当指定文件变化时,自动启动一个指定的流程。 再比如 定义一个触发器,每隔 5 分钟自动触发执行某个流程。
在流程定义中配置触发器
elsa 提供代码或可视化设计器的方式定义流程,由于触发器仅仅是一个特殊的 Activity,所以一样的,通过代码或在设计器拖拽触发器到流程定义中即可。
不同类型的触发器需要配置不同参数,如:elsa 内置的 StartAt,它表示在指定时间点自动触发执行,所以需要设置它的 DateTime,表示在这个时间点自动触发再比如:HttpEndpoint 是另一个 elsa 内置的触发器,它监听到指定请求时自动触发,所以需要配置它的 监听地址、Http 方法、是否做授权判断等等属性。
触发器存储(索引化)
触发器都是定义在流程定义中的,且一个流程定义中,可能有多个相同或不同类型的触发器, 从所有流程定义中把触发器都抽取出来,单独存储到一个列表中,当系统启动时,或别的情况需要访问整个系统中的触发器配置时,可以直接从这个列表中快速获取触发器, 这比每次都遍历所有流程定义,再从中抽取触发器更快,这就是触发器索引化,如果你用的 ef 配置为 elsa 的持久化,那么它会存储在 Triggers 表中
触发器索引化器由 ITriggerIndexer 接口表示,默认实现是 TriggerIndexer,它就提供保存、删除、获取触发器的功能。 它在保存时会根据流程或流程定义,获取里面定义的触发器列表,然后调用其 GetTriggerPayloadsAsync 方法,获取触发器配置时的参数,这个参数通常是根据触发器属性生成的, 比较特别的是某些触发器中,GetTriggerPayloadsAsync 会返回多个 payload,这会导致触发器索引列表中存储多个记录,比如内置触发器 HttpEndpoint,会根据用户配置的多个 Http 方法, 返回多个数据,如果你配置了 GET POST,触发器索引列表会存储对应的两条记录,将来外部请求同一个 url 地址时,无论是 get 还是 post,HttpEndpoint 这个节点都会被执行。
流程定义变动后发布流程时,或直接刷新流程定义时,或其它情况,总之流程定义变化后,都会调用 ITriggerIndexer 重新生产并保存触发器,保存后会触发 WorkflowTriggersIndexed 事件。
所以索引器还起到一个奇怪的作用,就是让我们在流程定义中配置触发器相关参数,而配合触发器的外部监听功能可以通过从持久化获取,或从事件参数中 获得 触发器的配置数据,从而控制监听逻辑。
触发器外部的监听部分
监听这件事并不是定义在触发器节点内部的,而是外部配合的,比如 HttpEndpont 触发器,监听是单独的 asp.net core 中间件来实现的,但这个中间件应该依赖我们配置流程时给 HttpEndpont 触发器定义的参数。
配合 HttpPoint 触发的外部部分有个 UpdateRouteTable,它监听 WorkflowTriggersIndexed,并根据事件参数获取监听的地址,进而配置路由。另外 asp.net core 中间件中还可以直接从持久化中获取触发器, 进而访问器 payload 中的触发器配置参数,并根据这些参数控制此中间件的执行流程。
而配合定时器相关触发器 Timer StartAt Cron 等的外部分是 ScheduleWorkflows,它也监听 WorkflowTriggersIndexed 事件,在事件处理中,调用 elsa 调度器安排后台作业来,以实现到指定时间后让触发器执行。
触发器节点被执行
触发器是特殊的 Activity,假如有个流程:A → B → C,其中 B 是触发器,当前流程可能并不是因为 B 的外部监听触发此流程的执行,可能是 A 执行后,流转到 B,导致 B 的 ExecuteAsync 被执行。ActivityExecutionContext.IsTriggerOfWorkflow 就是用来判断这种情况的,若当前流程就是自己这个节点触发的,则为 true,否则为 false 所以触发器执行时 ExecuteAsync 方法中通常需要判断这两种情况。
内置 HttpEndpoint 触发器分析
这里分析下内置的 HttpEndpoint 触发器,但仅关注触发器的原理部分,以帮助我们更深刻地理解触发器的工作原理。 它定义在 Elsa.Http 模块中,它继承至Trigger<HttpRequest>
与触发器相关输入参数
HttpEndpoint.GetTriggerPayloads
核心源码:
protected override IEnumerable<object> GetTriggerPayloads(TriggerIndexingContext context) => GetBookmarkPayloads(context.ExpressionExecutionContext);
private IEnumerable<object> GetBookmarkPayloads(ExpressionExecutionContext context)
{
// Generate bookmark data for path and selected methods.
var normalizedRoute = context.Get(Path)!.NormalizeRoute();
var methods = SupportedMethods.GetOrDefault(context) ?? new List<string> { HttpMethods.Get };
var authorize = Authorize.GetOrDefault(context);
var policy = Policy.GetOrDefault(context);
var requestTimeout = RequestTimeout.GetOrDefault(context);
var requestSizeLimit = RequestSizeLimit.GetOrDefault(context);
//根据http请求方法,返回多个数据,会在触发器索引列表中创建多条记录
return methods
.Select(x => new HttpEndpointBookmarkPayload(normalizedRoute, x.ToLowerInvariant(), authorize, policy, requestTimeout, requestSizeLimit))
.Cast<object>()
.ToArray();
}
复制代码
在所在流程被发布时,会调用 GetTriggerPayloads 方法,而它会返回上述输入参数,这些输入参数最终被保存到数据库中,还会触发 WorkflowTriggersIndexed 事件,这些监听相关的 参数还会保存到这个事件的参数中。
这个方法会根据配置的 SupportedMethods 返回一个或多个对象,最终导致触发器索引列表中出现多条对应记录。
UpdateRouteTable
它监听 WorkflowTriggersIndexed 事件,从事件参数中获取 Path,然后更新 elsa 路由表
public class UpdateRouteTable(IRouteTableUpdater routeTableUpdater, IOptions<HttpActivityOptions> options) :
INotificationHandler<WorkflowTriggersIndexed>,
INotificationHandler<WorkflowBookmarksIndexed>
{
/// <inheritdoc />
public async Task HandleAsync(WorkflowTriggersIndexed notification, CancellationToken cancellationToken)
{
routeTableUpdater.RemoveRoutes(notification.IndexedWorkflowTriggers.RemovedTriggers);
await routeTableUpdater.AddRoutesAsync(notification.IndexedWorkflowTriggers.AddedTriggers, cancellationToken);
await routeTableUpdater.AddRoutesAsync(notification.IndexedWorkflowTriggers.UnchangedTriggers, cancellationToken);
}
复制代码
HttpWorkflowsMiddleware
elsa http endpoint 监听中间件,直接看注释吧
public async Task InvokeAsync(HttpContext httpContext, IServiceProvider serviceProvider)
{
//当前请求路径
var path = GetPath(httpContext);
//根据elsa路由表匹配路由数据
var matchingPath = GetMatchingRoute(serviceProvider, path).Route;
//配置elsa时,HttpActivityOptions中指定的基础地址
var basePath = options.Value.BasePath?.ToString().NormalizeRoute();
//若请求地址连elsa配置的基础地址都不匹配,则直接执行下个中间件,说明没见听到触发器定义的要求
// If the request path does not match the configured base path to handle workflows, then skip.
if (!string.IsNullOrWhiteSpace(basePath))
{
if (!path.StartsWith(basePath, StringComparison.OrdinalIgnoreCase))
{
await next(httpContext);
return;
}
// Strip the base path.
matchingPath = matchingPath[basePath.Length..];
}
matchingPath = matchingPath.NormalizeRoute();
var input = new Dictionary<string, object>
{
[HttpEndpoint.HttpContextInputKey] = true,
[HttpEndpoint.RequestPathInputKey] = path.NormalizeRoute()
};
var cancellationToken = httpContext.RequestAborted;
var request = httpContext.Request;
var method = request.Method.ToLowerInvariant();
var httpWorkflowLookupService = serviceProvider.GetRequiredService<IHttpWorkflowLookupService>();
var workflowInstanceId = await GetWorkflowInstanceIdAsync(serviceProvider, httpContext, cancellationToken);
var correlationId = await GetCorrelationIdAsync(serviceProvider, httpContext, cancellationToken);
//根据请求路径 http方法 和 HttpEndpoint计算hash值
var bookmarkHash = ComputeBookmarkHash(serviceProvider, matchingPath, method);
//根据上面的hash值,从存储中获取匹配的工作流及其触发器列表
var lookupResult = await httpWorkflowLookupService.FindWorkflowAsync(bookmarkHash, cancellationToken);
if (lookupResult != null)
{
//若找到了流程,且里面仅包含一个与当前请求匹配的触发器,则说了匹配上了,执行流程,否则报错
var triggers = lookupResult.Triggers;
if (triggers.Count > 1)
{
//报错
await HandleMultipleWorkflowsFoundAsync(httpContext, () => triggers.Select(x => new
{
x.WorkflowDefinitionId
}), cancellationToken);
return;
}
var trigger = triggers.FirstOrDefault();
if (trigger != null)
{
var workflowGraph = lookupResult.WorkflowGraph!;
//执行流程中触发器所在节点
await StartWorkflowAsync(httpContext, trigger, workflowGraph, input, workflowInstanceId, correlationId);
return;
}
}
//若触发器节点已经执行过,也就是之前从其它节点流转过去的,那次触发器节点被执行时会创建书签的,则直接根据书签恢复执行
var bookmarks = await FindBookmarksAsync(serviceProvider, bookmarkHash, workflowInstanceId, correlationId, cancellationToken).ToList();
//若找到多个匹配的,报错
if (bookmarks.Count > 1)
{
await HandleMultipleWorkflowsFoundAsync(httpContext, () => bookmarks.Select(x => new
{
x.WorkflowInstanceId
}), cancellationToken);
return;
}
var bookmark = bookmarks.SingleOrDefault();
if (bookmark != null)
{
//恢复书签执行
await ResumeWorkflowAsync(httpContext, bookmark, input, correlationId);
return;
}
// 如果基础地址都匹配上了,却没找到对应的流程,则抛出404错误
if (basePath != null)
{
await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
return;
}
// If no base path was configured, the request should be handled by subsequent middlewares.
await next(httpContext);
}
复制代码
HttpEndpoint.ExecuteAsync
若前面的中间件匹配上当前触发器节点
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
var path = Path.Get(context);
if (path.Contains("//"))
throw new RoutePatternException(path, "Path cannot contain double slashes (//)");
//如果本次执行不是由当前HttpEndpoint自己触发的,比如在当前节点前另一个触发器执行了此流程,但是节点流转到这里来了。
if (!context.IsTriggerOfWorkflow())
{
//则直接创建书签卡住流程,等到匹配的http请求被HttpWorkflowsMiddleware流转到这里时,OnResumeAsync将被执行
context.CreateBookmarks(GetBookmarkPayloads(context.ExpressionExecutionContext), includeActivityInstanceId: false, callback: OnResumeAsync);
return;
}
//否则,说明当前流程的执行,就是这里配置的触发器触发的。
var httpContextAccessor = context.GetRequiredService<IHttpContextAccessor>();
var httpContext = httpContextAccessor.HttpContext;
//触发器也是activity,它可能直接被执行,而不是被http请求执行,则需要等待传统的书签请求来恢复OnResumeAsync
if (httpContext == null)
{
// We're executing in a non-HTTP context (e.g. in a virtual actor).
// Create a bookmark to allow the invoker to export the state and resume execution from there.
context.CreateBookmark(OnResumeAsync, BookmarkMetadata.HttpCrossBoundary);
return;
}
//否则说明当前流程的触发器就是自己,并且被http请求触发了
await HandleRequestAsync(context, httpContext);
}
复制代码
HttpEndpoint.OnResumeAsync
无论时 HttpWorkflowsMiddleware 匹配上,通过书签恢复流程执行;还是走的传统的书签恢复,都会执行这里。
private async ValueTask OnResumeAsync(ActivityExecutionContext context)
{
var httpContextAccessor = context.GetRequiredService<IHttpContextAccessor>();
var httpContext = httpContextAccessor.HttpContext;
//在恢复执行时,可能并不是http请求恢复的,可能是直接调用书签恢复的
if (httpContext == null)
{
// We're executing in a non-HTTP context (e.g. in a virtual actor).
// Create a bookmark to allow the invoker to export the state and resume execution from there.
context.CreateBookmark(OnResumeAsync, BookmarkMetadata.HttpCrossBoundary);
return;
}
//处理http请求
await HandleRequestAsync(context, httpContext);
}
复制代码
触发器调度
有几个跟时间相关的内置触发器:Cron、StartAt、Timer,它们使用 elsa 的工作流调度框架,在后台作业中,根据设置的时间规则,触发执行流程。
触发器调度器 ITriggerScheduler
ITriggerScheduler 它定义两个方法,调度触发器、注销触发器调度。 默认实现 DefaultTriggerScheduler,它使用 elsa 的流程调度器,实现在后台作业中实现流程安排。 值得注意的是,它们在触发时,DefaultTriggerScheduler 总是创建新的流程实例。核心源码如下:
public async Task ScheduleAsync(IEnumerable<StoredTrigger> triggers, CancellationToken cancellationToken = default)
{
var triggerList = triggers.ToList();
var timerTriggers = triggerList.Filter<Activities.Timer>();
var startAtTriggers = triggerList.Filter<StartAt>();
var cronTriggers = triggerList.Filter<Cron>();
var now = systemClock.UtcNow;
// Schedule each Timer trigger.
foreach (var trigger in timerTriggers)
{
var (startAt, interval) = trigger.GetPayload<TimerTriggerPayload>();
var input = new { StartAt = startAt, Interval = interval }.ToDictionary();
//安排流程作业时,要求创建新的流程实例
var request = new ScheduleNewWorkflowInstanceRequest
{
WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionVersionId(trigger.WorkflowDefinitionVersionId),
TriggerActivityId = trigger.ActivityId,
Input = input
};
await workflowScheduler.ScheduleRecurringAsync(trigger.Id, request, startAt, interval, cancellationToken);
}
// Schedule each StartAt trigger.
foreach (var trigger in startAtTriggers)
{
var executeAt = trigger.GetPayload<StartAtPayload>().ExecuteAt;
// If the trigger is in the past, log info and skip scheduling.
if (executeAt < now)
{
logger.LogInformation("StartAt trigger is in the past. TriggerId: {TriggerId}. ExecuteAt: {ExecuteAt}. Skipping scheduling", trigger.Id, executeAt);
continue;
}
var input = new { ExecuteAt = executeAt }.ToDictionary();
//安排流程作业时,要求创建新的流程实例
var request = new ScheduleNewWorkflowInstanceRequest
{
WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionVersionId(trigger.WorkflowDefinitionVersionId),
TriggerActivityId = trigger.ActivityId,
Input = input
};
await workflowScheduler.ScheduleAtAsync(trigger.Id, request, executeAt, cancellationToken);
}
// Schedule each Cron trigger.
foreach (var trigger in cronTriggers)
{
var payload = trigger.GetPayload<CronTriggerPayload>();
var cronExpression = payload.CronExpression;
if (string.IsNullOrWhiteSpace(cronExpression))
{
logger.LogWarning("Cron expression is empty. TriggerId: {TriggerId}. Skipping scheduling of this trigger", trigger.Id);
continue;
}
var input = new { CronExpression = cronExpression }.ToDictionary();
//安排流程作业时,要求创建新的流程实例
var request = new ScheduleNewWorkflowInstanceRequest
{
WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionVersionId(trigger.WorkflowDefinitionVersionId),
TriggerActivityId = trigger.ActivityId,
Input = input
};
try
{
await workflowScheduler.ScheduleCronAsync(trigger.Id, request, cronExpression, cancellationToken);
}
catch (FormatException ex)
{
logger.LogWarning(ex, "Cron expression format error. CronExpression: {CronExpression}", cronExpression);
}
}
}
复制代码
ScheduleWorkflows 监听触发器变动事件,并进行触发器调度。
public class ScheduleWorkflows : INotificationHandler<WorkflowTriggersIndexed>, INotificationHandler<WorkflowBookmarksIndexed>
{
//...其它代码
public async Task HandleAsync(WorkflowTriggersIndexed notification, CancellationToken cancellationToken)
{
//注销之前的后台作业
await _triggerScheduler.UnscheduleAsync(notification.IndexedWorkflowTriggers.RemovedTriggers, cancellationToken);
//使用后台作业,在指定时间点恢复触发器所在节点
await _triggerScheduler.ScheduleAsync(notification.IndexedWorkflowTriggers.AddedTriggers, cancellationToken);
}
复制代码
以 StartAt 触发器为例
这个相对简单,发布流程时触发触发器所以变更事件
protected override object GetTriggerPayload(TriggerIndexingContext context)
{
//从输入参数中获取payload
var executeAt = context.ExpressionExecutionContext.Get(DateTime);
//返回,以供触发器调度器访问
return new StartAtPayload(executeAt);
}
/// <inheritdoc />
protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
{
//若当前流程的执行,正是当前触发器导致执行的,则直接完成,因为此时是时间到了,当前方法第二次被执行。
if (context.IsTriggerOfWorkflow())
{
await context.CompleteActivityAsync();
return;
}
//否则说明是另一个触发器导致流程执行,并流转到这里,这个时候应该去调度任务。
//从输入参数中获取指定的触发时间
var executeAt = context.ExpressionExecutionContext.Get(DateTime);
var clock = context.ExpressionExecutionContext.GetRequiredService<ISystemClock>();
var now = clock.UtcNow;
var logger = context.GetRequiredService<ILogger<StartAt>>();
context.JournalData.Add("Executed At", now);
if (executeAt <= now)
{
logger.LogDebug("Scheduled trigger time lies in the past ('{Delta}'). Completing immediately", now - executeAt);
await context.CompleteActivityAsync();
return;
}
//书签持久化中间件会保存书签,并触发事件,事件处理器会调度此书签,并用到这个参数
var payload = new StartAtPayload(executeAt);
//书签持久化中间件会保存书签,并且触发书签变更事件,进而事件处理器调度书签去安排任务
//书签调度器会根据类型StartAtPayload去做调度
//关于调度需要看单独的章节。
context.CreateBookmark(payload);
}
复制代码
文章转载自:变形精怪
原文链接:https://www.cnblogs.com/jionsoft/p/18675990
体验地址:http://www.jnpfsoft.com/?from=001YH
评论