Camunda外部任务-使用弹性和可扩展架构构建应用程序的强大工具

图片

在Tinkoff中,我们使用Camunda + Spring框架来开发业务流程自动化系统我们以流程图的形式使用BPMN(业务流程管理符号)描述业务流程。

我们的图中最常用的元素是服务任务(齿轮矩形)。Camunda支持两种执行服务任务的方式

  1. 使用对Java代码的同步调用。
  2. 创建一个外部任务。

第二种方法允许您使用外部系统执行任务-例如,如果您需要从另一个应用程序调用一个camunda应用程序,甚至需要将工作委托给任何外部系统。

图片
BPMN图示例

当您打算在多个应用程序中重用逻辑时,这很有用。或者,当您想要坚持微服务架构时。例如,将处理业务流程的服务与实现技术任务(例如生成报告或邮件)的服务分开。

与此功能一起,外部任务提供了可伸缩的弹性体系结构。要了解为什么会发生这种情况,您首先需要了解外部任务如何BPMN和应用程序级别工作。

BPMN中的外部任务


外部任务涉及创建可由外部处理程序执行的任务。外部任务模式的本质是:

  1. , «» , «».
  2. camunda , , .
  3. camunda (/).

在上图中,我描述了一个虚构的过程,在该过程中,我们希望获取用户列表,向他们发送广告,并在2小时后计算出营销时事通讯之后的申请数量。并且,如果有10个以上的应用程序,请增加下一封邮件的选择范围。

我希望我的camunda应用程序仅负责业务流程,而其他任何应用程序均应负责电子邮件。在这种情况下,外部任务模式对我来说很好在我的过程中,我将只创建一个电子邮件通讯任务,并等待某些外部处理程序完成它。

要在图中创建外部任务,您必须:

  1. 创建一个常规任务
  2. 将其类型更改为服务任务
  3. 将实现设置为external
  4. 指定主题字段的值

图片

主题是队列的名称,一种类型的任务将添加到该队列中,外部处理程序将向该队列订阅。

现在,该过程中有一个外部任务,您可以启动它,但是由于没有人在处理它,因此它不会被执行。

外部任务工作者


外部任务 模式的好处在于,它允许您使用可以执行HTTP请求的任何工具以任何语言实现任务处理

以下是来自camunda博客的示例。该示例实现了一个外部javascript处理程序,该处理程序每20秒从camunda请求一个处理任务列表。如果有任务,则将其发送出去,并通知camunda有关任务的完成情况。

const baseUrl = 'http://localhost:8080/my-app/rest';
const workerSettings = {
 workerId: 'worker01', // some unique name for the current worker instance
 maxTasks: 5,
 topics: [
   {
     topicName: 'sendEmail',
     lockDuration: 10000, // How much time the worker thinks he needs to process the task
     variables: ['video'] // Which variables should be returned in the response (to avoid additional REST calls to read data)
   }]};
const requestParams = {method: 'POST', headers: {contentType: 'application/json'}};

function pollExternalTasks() {
 return fetch(`${baseUrl}/external-task/fetchAndLock`, {
   ...requestParams,
   body: JSON.stringify(workerSettings)
 })
}

function processExternalTask(result = []) {
 return Promise.all(result.map(externalTask => {
   sendEmail(externalTask); // Here the actual work would be done

   return fetch(`${baseUrl}/external-task/${externalTask.id}/complete`, {
     ...requestParams,
     body: JSON.stringify({workerId: workerSettings.workerId}),
   })
 }));
}

setInterval(() => {
 pollExternalTasks().then(processExternalTask)
}, 20000);

从上面的代码中可以看到,处理外部任务的关键方法fetchAndLockcomplete第一种方法请求任务列表并确保其执行安全,第二种方法通知任务已完成。除了这两种方法外,还有其他方法,您可以在官方文档中阅读有关它们的信息

Camunda外部任务客户端


图片

为了实现外部任务处理,camunda提供了JavascriptJava客户端,使您可以在几行中创建外部任务处理程序。还有一个详细的指南,描述了处理外部任务的基本原理-再次以JavascriptJava为例

使用ExternalTask​​Client的外部处理程序的示例实现

public class App {
   public static void main(String... args) {
       // bootstrap the client
       ExternalTaskClient client = ExternalTaskClient.create()
           .baseUrl("http://localhost:8080/engine-rest")
           .asyncResponseTimeout(1000)
           .build();

       // subscribe to the topic
       client.subscribe("sendEmail").handler((externalTask, externalTaskService) -> {
           try {
               String result = sendEmail(externalTask)
               Map<String, Object> variables = new HashMap<>();

               variables.put("result", result);
               externalTaskService.complete(externalTask, variables);
               System.out.println("The External Task " + externalTask.getId() + " has been completed!");
           } catch (e: Exception) {
               externalTaskService.handleFailure(externalTask, e.message, e.stackTrace.toString())
           }
       }).open();
   }
}

如果您的任务不仅需要执行一些同步操作,还需要启动整个过程,那么您可以执行此操作,例如,通过RuntimeService启动该过程

@Service
class EmailWorker(
   private val runtimeService: RuntimeService
) {
   val builder = ExternalTaskClientBuilderImpl().baseUrl("http://localhost:8080").workerId("myWorker")
   val taskClient = builder.build()
   val engineClient = (builder as ExternalTaskClientBuilderImpl).engineClient

   @PostConstruct
   fun init() {
       taskClient
           .subscribe("sendEmail")
           .lockDuration(10000)
           .handler { externalTask, externalService ->
               runtimeService.startProcessInstanceByKey(
                   "SendEmailProcess",
                   externalTask.getVariable("emailId"),
                   mapOf(
                       "text" to externalTask.getVariable("text"),
                       "email" to externalTask.getVariable("email")
                   )
               )
           }
           .open()
   }


   @PreDestroy
   fun destroy() {
       taskClient.stop()
   }
}

// Delegate from SendEmailProcess process
@Component
class EmailResultDelegate(private val emailWorker: EmailWorker) {
   fun doExecute(execution: DelegateExecution) {
       emailWorker.engineClient.complete(
           execution.readVar(EXTERNAL_TASK_ID),
           mapOf("result" to "Success")
       )
   }
}

在此示例中,外部任务处理程序(EmailWorker)在收到任务后,启动SendEmailProcess进程

想象一下,该过程执行了一些发送通讯的必要动作,最后调用了EmailResultDelegate,该电子邮件反过来完成了外部任务

外部任务的架构优势


值得注意的是,有一种方法可以以更简单的方式在另一个camunda应用程序启动该过程POST:/ rest / process-definition / key / $ {id} / start

使用REST时,您没有任何事务保证。但是毕竟,我们还通过REST处理外部任务,那么有什么区别? 不同之处在于我们不直接调用外部服务,而只是发布可以处理的任务。考虑一个示例: 某个外部处理程序采用了现在分配给它的任务,但是当收到响应时,连接断开。现在在camunda一边



图片

不会处理的任务被阻止,因为外部处理程序未收到响应。但这并不令人恐惧:在camunda中执行外部任务时,超时将使任务再次返回队列,其他人可以处理该超时。

现在,让我们看一下外部处理程序接收任务,完成任务并调用complete方法的情况,该任务由于网络问题而失败。现在您将无法了解任务是否在camunda中成功完成。您可以重试,但是网络问题可能还会继续。

在这种情况下,最好的解决方案是忽略该问题。如果任务成功完成,则一切正常。如果没有,则在超时后,任务将再次可供处理。但这意味着您的外部处理程序必须是幂等的,或包含对任务进行重复数据删除的逻辑。

开始新流程时可能会发生类似的问题,因此您应该检查具有相同数据的现有实例,例如businessKey

除了高容错能力外,任务使您可以轻松扩展外部处理程序并以任何编程语言实现它们。同时,该模式有助于实现微服务,以便它们尽可能少地相互影响,从而提高其稳定性。

有关外部任务的更多信息
https : //docs.camunda.org/manual/latest/user-guide/process-engine/external-tasks/
https://docs.camunda.org/manual/latest/reference/rest/external -task /
https://docs.camunda.org/manual/latest/user-guide/ext-client/

All Articles