写点什么

工作流框架 Activiti 中的事务和并发!详细解析工作流中的异步和排他操作

发布于: 2021 年 06 月 06 日
工作流框架Activiti中的事务和并发!详细解析工作流中的异步和排他操作

事务和并发

异步操作

  • Activiti 通过事务方式执行流程,可以根据需求定制

  • Activiti 处理事务:

  • 如果触发了 Activiti 的操作(开始流程,完成任务,触发流程继续执行),activiti 会推进流程,直到每个分支都进入等待状态

  • 抽象的说,会从流程图执行深度优先搜索,如果每个分支都遇到等待状态,就会返回

  • 等待状态是稍后需要执行任务,Activiti 会把当前状态保存到数据库中,然后等待下一次触发

  • 触发可能来自外部,比如用户任务或接收到一个消息,也可能来自 Activiti 本身(定时器事件)

  • 流程包含用户任务,服务任务和定时器事件完成用户任务和校验地址是在同一个工作单元中,两者的成功和失败是原子性的.意味着如果服务任务抛出异常,要回滚当前事务,这样流程会退回到用户任务,用户任务就依然在数据库里这就是 activiti 默认的行为.在(1)中应用或客户端线程完成任务.这会执行服务,流程推进,直到遇到一个等待状态,就是定时器(2),然后它会返回给调用者(3),并提交事务(如果事务是由 Activiti 开启的)

  • 有时需要自定义控制流程中事务的边界,把业务逻辑包裹在一起.这就需要使用异步执行:

  • 完成了用户任务,生成一个发票,把发票发送给客户生成发票不在同一个工作单元内了.如果生成发票出错不需要对用户任务进行回滚 Activiti 实现的是完成用户任务(1),提交事务,返回给调用者应用.然后在后台的线程中,异步执行生成发票.后台线程就是 Activiti 的 Job 执行器(一个线程池)周期对数据库的 Job 进行扫描:当到达"generate invoice"任务,为 Activiti 创建一个稍后执行的 Job"消息",并保存到数据库.Job 会被 Job 执行器获取并执行.也会给本地 Job 执行器一个提醒,告诉有一个新 Job,来增加性能

  • 要想使用这个特性,要使用 activiti:async="true" 扩展


<serviceTask id="service1" name="Generate Invoice" activiti:class="my.custom.Delegate" activiti:async="true" />
复制代码


  • activiti:async 可以使用到以下 BPMN 任务类型中:

  • task

  • serviceTask,

  • scriptTask

  • businessRuleTask

  • sendTask

  • receiveTask

  • userTask

  • subProcess

  • callActivity

  • 对于 userTask,receiveTask 和其他等待状态,异步执行的作用是让开始流程监听器运行在一个单独的线程或者事务中

排他任务

  • 从 Activiti 5.9 开始 ,JobExecutor 能保证同一个流程实例中的 Job 不会并发执行

排他任务的产生背景


  • 一个并行网关,后面有三个服务任务,都设置为异步执行:

  • 这样会添加三个 job 到数据库里.一旦 job 进入数据库,就可以被 jobExecutor 执行了.JobExecutor 会获取 job,代理到工作线程的线程池中,在那里真正执行 job

  • 就是说,使用异步执行,可以把任务分配给这个线程池(在集群环境,可能会使用多个线程池)

  • 产生一致性问题:

  • 考虑一下服务任务后的汇聚:当服务任务完成后,到达并发汇聚节点,需要决定是等待其他分支,还是继续向下执行

  • 就是说,对每个到达并行汇聚的分支,都需要判断是继续还是等待其他分支的一个或多个分支

  • 为什么会产生这样的问题:

  • 因为服务任务配置成使用异步执行,可能相关的 job 都在同一时间被获取,被 JobExecutor 分配给不同的工作线程执行

  • 结果是,三个单独的服务执行使用的事务在到达并发汇聚时可能重叠:

  • 如果出现了这个问题,这些事务是互相不可见的,其他事务同时到达了相同的并发汇聚,假设都在等待其他分支

  • 然而,每个事务都假设在等待其他分支,所以没有分支会越过并发汇聚继续执行,流程实例会一直在等待状态,无法继续执行

  • Activiti 解决这个问题方式:

  • Activiti 使用了乐观锁:

  • 当基于判断的数据看起来不是最新的时候 (因为其他事务可能在提交之前进行了修改,会在每个事务里增加数据库同一行的版本),这个时候,第一个提交的事务会成功,其他会因为乐观锁异常导致失败

  • 这就解决了上面流程的问题:

  • 如果多个分支同步到达并行汇聚,会假设都在登录,并增加父流程的版本号(流程实例)然后尝试提交

  • 第一个分支会成功提交,其他分支会因为乐观锁导致失败

  • 因为流程是被 job 触发的,Activiti 会尝试在等待一段时间后尝试执行同一个 job,这段时间可以同步网关的状态

  • Activiti 乐观锁是一个很好的解决方案吗?

  • 乐观锁允许 Activiti 避免非一致性,确定流程不会"堵在汇聚网关": 或者所有分支都通过网关,或者数据库中的 job 正在尝试通过

  • 虽然这是一个对于持久性和一致性的完美解决方案,但对于上层来说不一定是期望的行为:

  • Activiti 只会对同一个 job 重试估计次数(默认配置为 3).之后,job 还会在数据库里,但是不会再重试了.意味着这个操作必须手工执行 job 的触发

  • 如果 job 有非事务方面的效果,不会因为失败的事务回滚:如果“预定演唱会门票”服务没有与 Activiti 共享事务,重试 job 可能导致我们预定了过多门票

  • 针对这些问题,在 Activiti 中推出了新的概念:排他 job

排他 Job
  • 对于一个流程实例,排他任务不能同时执行两个

  • 考虑上面的流程:如果我们把服务任务申请为排他任务,JobExecutor 会保证对应的 job 不会并发执行.

  • 会保证无论什么时候获取一个流程实例的排他任务,都会把同一个流程实例的其他任务都取出来,放在同一个工作线程中执行.保证 job 是顺序执行的

  • 从 activiti 5.9 开始,排他任务已经是默认配置.所以异步执行和定时器事件默认都是排他任务

  • 如果你想把 job 设置为非排他,可以使用 activiti:exclusive="false" 进行配置:


<serviceTask id="service" activiti:expression="${myService.performBooking(hotel, dates)}" activiti:async="true" activiti:exclusive="false" />
复制代码


  • 排他任务没有性能问题:

  • 在高负载的情况下性能是个问题,高负载意味着 JobExecutor 的所有工作线程都一直在忙碌着

  • 使用排他任务,Activiti 可以简单的分布不同的负载.排他任务意味着同一个流程实例的异步执行会由相同的线程顺序执行

  • 但是要考虑:如果有多个流程实例时.所有其他流程实例的 job 也会分配给其他线程同步执行

  • 意味着虽然 Activiti 不会同时执行一个流程实例的排他 job,但是还会同步执行多个流程实例的异步执行

  • 通过一个总体的预测,在大多数场景下,排他任务都会让单独的实例运行的更迅速.而且,对于同一流程实例中的 job,需要用到的数据也会利用执行的集群节点的缓存.如果任务没有在同一个节点执行,数据就必须每次从数据库重新读取了

流程实例授权

  • 默认所有人在部署的流程定义上启动一个新流程实例,通过流程初始化授权功能定义的用户和组,web 客户端可以限制哪些用户可以启动一个新流程实例

  • Activiti 引擎不会校验授权定义: 这个功能只是为减轻 web 客户端开发者实现校验规则的难度

  • 设置方法与用户任务用户分配类似,用户或组可以使用 activiti:potentialStarter 标签分配为流程的默认启动者:


 <process id="potentialStarter">     <extensionElements>       <activiti:potentialStarter>         <resourceAssignmentExpression>           <formalExpression>group2, group(group3), user(user3)</formalExpression>         </resourceAssignmentExpression>       </activiti:potentialStarter>     </extensionElements>   <startEvent id="theStart"/>   ...
复制代码


user(user3)是直接引用了用户 user3,group(group3)是引用了组 group3.如果没显示设置,默为群组


  • 也可以使用 process 标签的属性 activiti:candidateStarterUsers activiti:candidateStarterGroups


 <process id="potentialStarter" activiti:candidateStarterUsers="user1, user2"                                activiti:candidateStarterGroups="group1">      ...
复制代码


可以同时使用这两个属性


  • 定义流程初始化授权后,开发者可以使用如下方法获得授权定义.可以获得给定的用户能够启动哪些流程定义:


 processDefinitions = repositoryService.createProcessDefinitionQuery().startableByUser("userxxx").list();
复制代码


  • 可以获得指定流程定义设置的潜在启动者对应的 IdentityLink:


 identityLinks = repositoryService.getIdentityLinksForProcessDefinition("processDefinitionId"); 
复制代码


  • 获得可以启动给定流程的用户列表的示例:


  List<User> authorizedUsers =  identityService().createUserQuery().potentialStarter("processDefinitionId").list(); 
复制代码


  • 获得可以启动给定流程配置的群组的示例:


 List<Group> authorizedGroups =  identityService().createGroupQuery().potentialStarter("processDefinitionId").list();  
复制代码

数据对象

  • BPMN 提供了一种功能,可以在流程定义或子流程中定义数据对象

  • 根据 BPMN 规范,流程定义可以包含复杂 XML 结构,可以导入 XSD 定义

  • 对于 Activiti 来说 ,作为 Activiti 首次支持的数据对象, 可以支持如下的 XSD 类型


  <dataObject id="dObj1" name="StringTest" itemSubjectRef="xsd:string"/>
复制代码


  <dataObject id="dObj2" name="BooleanTest" itemSubjectRef="xsd:boolean"/>
复制代码


  <dataObject id="dObj3" name="DateTest" itemSubjectRef="xsd:datetime"/>
复制代码


  <dataObject id="dObj4" name="DoubleTest" itemSubjectRef="xsd:double"/>
复制代码


  <dataObject id="dObj5" name="IntegerTest" itemSubjectRef="xsd:int"/>
复制代码


  <dataObject id="dObj6" name="LongTest" itemSubjectRef="xsd:long"/>
复制代码


  • 数据对象定义会自动转换为流程变量,名称与 name 属性对应

  • 除了数据对象的定义之外,Activiti 支持使用扩展元素来为这个变量赋予默认值:


<process id="dataObjectScope" name="Data Object Scope" isExecutable="true">          <dataObject id="dObj123" name="StringTest123" itemSubjectRef="xsd:string">            <extensionElements>              <activiti:value>Testing123</activiti:value>            </extensionElements>          </dataObject>
复制代码


发布于: 2021 年 06 月 06 日阅读数: 348
用户头像

一位攻城狮的自我修养 2021.04.06 加入

分享技术干货,面试题和攻城狮故事。 你的关注支持是我持续进步的最大动力! https://github.com/ChovaVea

评论

发布
暂无评论
工作流框架Activiti中的事务和并发!详细解析工作流中的异步和排他操作