写点什么

一个 Camel Multicast 组件聚合策略问题的解决过程

发布于: 刚刚

摘要:本文通过案例,发现了一个 Camel Multicast 组件聚合策略相关的问题。通过查看 Camel 源代码,找到了问题原因并给出了解决方案。希望本文可以帮助到遇到同样问题的 Camel 用户。


本文分享自华为云社区《使用Apache Camel Multicast组件遇到的一个问题》,作者:中间件小哥。


1 前言

本文翻译自华为加拿大研究所的 Reji Mathews 发表于 Apache Camel 社区的《ROUTING MULTICAST OUTPUT AFTER ENCOUNTERING PARTIAL FAILURES》一文。在征得原作者同意后,本文对原文的部分内容作了少许修改。


2 Multicast 组件简介

Multicast 是 Apache Camel(以下简称“Camel”)中一个功能强大的 EIP 组件,可以将消息发送至多条子路径,然后并行地执行它们。


参考官网文档,我们可以使用两种方式配置 Multicast 组件:


  • 独立执行所有子路径,并将最后响应的子路径的结果作为最终输出。这也是 Multicast 组件的默认配置。

  • 通过实现 Camel 的聚合策略(Aggregation Strategy),使用自定义的聚合器来处理所有子路径的输出。

3 问题描述

本文使用案例如下:使用 Jetty 组件发布一个 API,调用该 API 后,消息会分别发送至"direct:A"和"direct:B"两条子路径。在使用自定义的聚合策略处理后,继续执行后续步骤。其中在"direct:A"中抛出一个异常,来模拟运行失败;"direct:B"正常运行。同时在 onException 中定义了异常处理策略。


本文使用的 Camel 版本为 3.8.0

@Overridepublic void configure() throws Exception {    onException(Exception.class)        .useOriginalMessage()        .handled(true)        .log("Exception handler invoked")        .transform().constant("{\"data\" : \"err\"}")        .end();  from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET")        .log("received request")        .log("Entering multicast")        .multicast(new SimpleFlowMergeAggregator())        .parallelProcessing().to("direct:A", "direct:B")        .end()        .log("Aggregated results ${body}")        .log("Another log")        .transform(simple("{\"result\" : \"success\"}"))        .end();     from("direct:A")        .log("Executing PATH_1 - exception path")        .transform(constant("DATA_FROM_PATH_1"))        .log("Starting exception throw")        .throwException(new Exception("USER INITIATED EXCEPTION"))        .log("PATH_1")        .end();     from("direct:B")        .log("Executing PATH_2 - success path")        .delayer(1000)        .transform(constant("DATA_FROM_PATH_2"))        .log("PATH_2")        .end();}
复制代码

自定义聚合器 SimpleFlowMergeAggregator 定义如下,其中我们将所有子路径的结果放入一个 list 对象。

public class SimpleFlowMergeAggregator implements AggregationStrategy {    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());    @Override    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());        if(oldExchange == null) {            String data = newExchange.getIn().getBody(String.class);            List<String> aggregatedDataList = new ArrayList<>();            aggregatedDataList.add(data);            newExchange.getIn().setBody(aggregatedDataList);            return newExchange;        }         List<String> oldData = oldExchange.getIn().getBody(List.class);        oldData.add(newExchange.getIn().getBody(String.class));        oldExchange.getIn().setBody(oldData);         return oldExchange;    }}
复制代码


基于对 Multicast 组件执行逻辑的理解,我们认为存在多个子路径时,其运行结果应该为:如果其中有一条子路径能运行成功,则使用聚合的结果继续执行后续步骤;如果所有子路径都运行失败,则停止整个路由(route)。本案例中,由于子路径"direct:A"运行异常,子路径"direct:B"运行正常,则应该正常执行后续两个步骤日志(log)和转换(transform)。


运行上述案例,日志信息如下:


2021-05-06 12:43:18.565 INFO 13956 --- [qtp916897446-42] route1 : received request2021-05-06 12:43:18.566 INFO 13956 --- [qtp916897446-42] route1 : Entering multicast2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Executing PATH_1 - exception path2021-05-06 12:43:18.575 INFO 13956 --- [ #4 - Multicast] route2 : Starting exception throw2021-05-06 12:43:18.578 INFO 13956 --- [ #4 - Multicast] route2 : Exception handler invoked2021-05-06 12:43:18.579 INFO 13956 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"}2021-05-06 12:43:19.575 INFO 13956 --- [ #3 - Multicast] route3 : Executing PATH_2 - success path2021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] route3 : PATH_22021-05-06 12:43:21.576 INFO 13956 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_2


观察上述日志,我们发现完成两条子路径结果的聚合后,后续的两个步骤日志(log)和转换(transform)并未执行。这并不符合我们期望的结果。


经过多次测试,我们还发现,只有当到达聚合器 SimpleFlowMergeAggregator 的第一个子路径("direct:A")执行异常时,便会发生这种后续步骤未执行的情况;而如果第一个子路径("direct:A")执行成功,即使另一个子路径("direct:B")执行失败,也会继续执行后续的步骤。

4 问题分析

接下来,我们通过查看 Camel 源代码,来找出上述现象的原因。


在 camel-core-processors 模块的 Pipeline.java 中,其 run()方法中有这样一段代码:

@Overridepublic void run() {    boolean stop = exchange.isRouteStop();    int num = index;    boolean more = num < size;    boolean first = num == 0;     if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {  // prepare for next run        if (exchange.hasOut()) {            exchange.setIn(exchange.getOut());            exchange.setOut(null);        }  // get the next processor        AsyncProcessor processor = processors.get(index++);         processor.process(exchange, this);    } else { // copyResults is needed in case MEP is OUT and the message is not an OUT message        ExchangeHelper.copyResults(exchange, exchange);  // logging nextExchange as it contains the exchange that might have altered the payload and since // we are logging the completion if will be confusing if we log the original instead // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots        if (LOG.isTraceEnabled()) {            LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);        }         AsyncCallback cb = callback;        taskFactory.release(this);        reactiveExecutor.schedule(cb);    }}
复制代码


其中,这个 if 判断决定了是否继续执行后续步骤:

if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG)))
复制代码


可以看出,在如下三种情况下,后续步骤将不会被执行:


  1. 之前的步骤已经将 exchange 对象标记为停止状态。

boolean stop = exchange.isRouteStop();
复制代码

2. 后续没有步骤可执行。

boolean more = num < size;
复制代码


3. continueProcessing()方法返回 false。


我们来看看 continueProcessing()方法的代码。

public final class PipelineHelper {    public static boolean continueProcessing(Exchange exchange, String message, Logger log) {        ExtendedExchange ee = (ExtendedExchange) exchange;        boolean stop = ee.isFailed() || ee.isRollbackOnly() || ee.isRollbackOnlyLast()                || (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled());        if (stop) {            if (log.isDebugEnabled()) {                StringBuilder sb = new StringBuilder();                sb.append("Message exchange has failed: ").append(message).append(" for exchange: ").append(exchange);                if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) {                    sb.append(" Marked as rollback only.");                }                if (exchange.getException() != null) {                    sb.append(" Exception: ").append(exchange.getException());                }                if (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled()) {                    sb.append(" Handled by the error handler.");                }                log.debug(sb.toString());            }             return false;        }        if (ee.isRouteStop()) {            if (log.isDebugEnabled()) {                log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);            }            return false;        }         return true;    }}
复制代码


可以看出,当执行过程发生异常并且被异常处理器捕获时,continueProcessing()方法将返回 false。


再回到我们的案例,第一个到达聚合器 SimpleFlowMergeAggregator 的子路径("direct:A"),会作为后续聚合的基础,其它子路径("direct:B")会在此基础上追加各自的 body 数据。实际上,很多 Camel 用户都会采用这种方式来实现自定义聚合策略。但这样做存在一个问题:在异常处理时,子路径"direct:A"的 exchange 对象会被设置一个状态标识,而此状态标识会被传递到下游,用于判断是否继续执行后续步骤。由于作为聚合基础的"direct:A"子路径的 exchange 对象状态为“异常”,最终 continueProcessing()方法将返回 false,后续的步骤也就不会再执行。


5 解决方案

对于上述问题,用户可以使用多种方式来设置异常处理时 exchange 对象的状态。本文采用如下解决方案:如果第一个子路径执行正常,则继续执行后续步骤;如果第一个子路径执行异常,则将其与其它执行成功的子路径交换,然后继续执行后续步骤。


更新后的自定义聚合器 SimpleFlowMergeAggregator 如下:

public class SimpleFlowMergeAggregator implements AggregationStrategy {    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());    @Override    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {        LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());        if(oldExchange == null) {            String data = newExchange.getIn().getBody(String.class);            List<String> aggregatedDataList = new ArrayList<>();            aggregatedDataList.add(data);            newExchange.getIn().setBody(aggregatedDataList);            return newExchange;        }         if(hadException(oldExchange)) {            if(!hadException(newExchange)) { // aggregate and swap the base                LOGGER.info("Found new exchange with success. swapping the base exchange");                List<String> oldData = oldExchange.getIn().getBody(List.class);                oldData.add(newExchange.getIn().getBody(String.class));            // swapped the base here                newExchange.getIn().setBody(oldData);                                 return newExchange;            }        }         List<String> oldData = oldExchange.getIn().getBody(List.class);        oldData.add(newExchange.getIn().getBody(String.class));        oldExchange.getIn().setBody(oldData);         return oldExchange;    }      private boolean hadException(Exchange exchange) {         if(exchange.isFailed()) {            return true;        }         if(exchange.isRollbackOnly()) {            return true;        }         if(exchange.isRollbackOnlyLast()) {            return true;        }         if(((ExtendedExchange)exchange).isErrorHandlerHandledSet()                && ((ExtendedExchange)exchange).isErrorHandlerHandled()) {            return true;        }         return false;    }}
复制代码

再次运行上述案例,日志信息如下:


2021-05-06 12:46:19.122 INFO 2576 --- [qtp174245837-45] route1 : received request2021-05-06 12:46:19.123 INFO 2576 --- [qtp174245837-45] route1 : Entering multicast2021-05-06 12:46:19.130 INFO 2576 --- [ #3 - Multicast] route2 : Executing PATH_1 - exception path2021-05-06 12:46:19.130 INFO 2576 --- [ #3 - Multicast] route2 : Starting exception throw2021-05-06 12:46:19.134 INFO 2576 --- [ #3 - Multicast] route2 : Exception handler invoked2021-05-06 12:46:19.135 INFO 2576 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator {"data" : "err"}2021-05-06 12:46:20.130 INFO 2576 --- [ #4 - Multicast] route3 : Executing PATH_2 - success path2021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] route3 : PATH_22021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Inside aggregator DATA_FROM_PATH_22021-05-06 12:46:22.132 INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator : Found new exchange with success. swapping the base exchange2021-05-06 12:46:22.133 INFO 2576 --- [ #4 - Multicast] route1 : Aggregated results {"data" : "err"},DATA_FROM_PATH_22021-05-06 12:46:22.133 INFO 2576 --- [ #4 - Multicast] route1 : Another log


可以看出,使用新的自定义聚合策略后,后续的日志(log)和转换(transform)步骤都成功执行。

6 结语

本文通过案例,发现了一个 Camel Multicast 组件聚合策略相关的问题。通过查看 Camel 源代码,找到了问题原因并给出了解决方案。


希望本文可以帮助到遇到同样问题的 Camel 用户。


点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 3
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
一个Camel Multicast组件聚合策略问题的解决过程