使用Apache Camel Multicast組件遇到的一個問題(使用apache部署web網站)

      網友投稿 1195 2022-05-30

      1 前言

      2 Multicast組件簡介

      Multicast是Apache Camel(以下簡稱“Camel”)中一個功能強大的EIP組件,可以將消息發送至多條子路徑,然后并行地執行它們。

      參考官網文檔,我們可以使用兩種方式配置Multicast組件:

      獨立執行所有子路徑,并將最后響應的子路徑的結果作為最終輸出。這也是Multicast組件的默認配置。

      通過實現Camel的聚合策略(AggregationStrategy),使用自定義的聚合器來處理所有子路徑的輸出。

      3 問題描述

      本文使用案例如下:使用Jetty組件發布一個API,調用該API后,消息會分別發送至"direct:A"和"direct:B"兩條子路徑。在使用自定義的聚合策略處理后,繼續執行后續步驟。其中在"direct:A"中拋出一個異常,來模擬運行失敗;"direct:B"正常運行。同時在onException中定義了異常處理策略。

      本文使用的Camel版本為3.8.0

      @Override

      public void configure() throws Exception {

      onException(Exception.class)

      .useOriginalMessage()

      .handled(true)

      .log("Exception handler invoked")

      .transform().constant("{\"data\" : \"err\"}")

      .end();

      from("jetty:http://localhost:8081/myapi?httpMethodRestrict=GET")

      .log("received request")

      .log("Entering multicast")

      .multicast(new SimpleFlowMergeAggregator())

      .parallelProcessing().to("direct:A", "direct:B")

      .end()

      .log("Aggregated results ${body}")

      .log("Another log")

      .transform(simple("{\"result\" : \"success\"}"))

      .end();

      from("direct:A")

      .log("Executing PATH_1 - exception path")

      .transform(constant("DATA_FROM_PATH_1"))

      .log("Starting exception throw")

      .throwException(new Exception("USER INITIATED EXCEPTION"))

      .log("PATH_1")

      .end();

      from("direct:B")

      .log("Executing PATH_2 - success path")

      .delayer(1000)

      .transform(constant("DATA_FROM_PATH_2"))

      .log("PATH_2")

      .end();

      }

      自定義聚合器SimpleFlowMergeAggregator定義如下,其中我們將所有子路徑的結果放入一個list對象。

      public class SimpleFlowMergeAggregator implements AggregationStrategy {

      private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());

      @Override

      public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

      LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());

      if(oldExchange == null) {

      String data = newExchange.getIn().getBody(String.class);

      List aggregatedDataList = new ArrayList<>();

      aggregatedDataList.add(data);

      newExchange.getIn().setBody(aggregatedDataList);

      return newExchange;

      }

      List oldData = oldExchange.getIn().getBody(List.class);

      oldData.add(newExchange.getIn().getBody(String.class));

      oldExchange.getIn().setBody(oldData);

      return oldExchange;

      }

      }

      基于對Multicast組件執行邏輯的理解,我們認為存在多個子路徑時,其運行結果應該為:如果其中有一條子路徑能運行成功,則使用聚合的結果繼續執行后續步驟;如果所有子路徑都運行失敗,則停止整個路由(route)。本案例中,由于子路徑"direct:A"運行異常,子路徑"direct:B"運行正常,則應該正常執行后續兩個步驟日志(log)和轉換(transform)。

      運行上述案例,日志信息如下:

      2021-05-06 12:43:18.565? INFO 13956 --- [qtp916897446-42] route1?????????????????????????????????? : received request

      2021-05-06 12:43:18.566? INFO 13956 --- [qtp916897446-42] route1?????????????????????????????????? : Entering multicast

      2021-05-06 12:43:18.575? INFO 13956 --- [ #4 - Multicast] route2?????????????????????????????????? : Executing PATH_1 - exception path

      2021-05-06 12:43:18.575? INFO 13956 --- [ #4 - Multicast] route2?????????????????????????????????? : Starting exception throw

      2021-05-06 12:43:18.578? INFO 13956 --- [ #4 - Multicast] route2?????????????????????????????????? : Exception handler invoked

      2021-05-06 12:43:18.579? INFO 13956 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator??????? : Inside aggregator {"data" : "err"}

      2021-05-06 12:43:19.575? INFO 13956 --- [ #3 - Multicast] route3?????????????????????????????????? : Executing PATH_2 - success path

      2021-05-06 12:43:21.576? INFO 13956 --- [ #3 - Multicast] route3?????????????????????????????????? : PATH_2

      2021-05-06 12:43:21.576? INFO 13956 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator??????? : Inside aggregator DATA_FROM_PATH_2

      觀察上述日志,我們發現完成兩條子路徑結果的聚合后,后續的兩個步驟日志(log)和轉換(transform)并未執行。這并不符合我們期望的結果。

      經過多次測試,我們還發現,只有當到達聚合器SimpleFlowMergeAggregator的第一個子路徑("direct:A")執行異常時,便會發生這種后續步驟未執行的情況;而如果第一個子路徑("direct:A")執行成功,即使另一個子路徑("direct:B")執行失敗,也會繼續執行后續的步驟。

      4 問題分析

      接下來,我們通過查看Camel源代碼,來找出上述現象的原因。

      在camel-core-processors模塊的Pipeline.java 中,其run()方法中有這樣一段代碼:

      @Override

      public void run() {

      boolean stop = exchange.isRouteStop();

      int num = index;

      boolean more = num < size;

      boolean first = num == 0;

      if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG))) {

      // prepare for next run

      if (exchange.hasOut()) {

      exchange.setIn(exchange.getOut());

      exchange.setOut(null);

      }

      // get the next processor

      AsyncProcessor processor = processors.get(index++);

      processor.process(exchange, this);

      } else {

      // copyResults is needed in case MEP is OUT and the message is not an OUT message

      ExchangeHelper.copyResults(exchange, exchange);

      // logging nextExchange as it contains the exchange that might have altered the payload and since

      // we are logging the completion if will be confusing if we log the original instead

      // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots

      if (LOG.isTraceEnabled()) {

      LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);

      }

      AsyncCallback cb = callback;

      taskFactory.release(this);

      reactiveExecutor.schedule(cb);

      }

      }

      其中,這個if判斷決定了是否繼續執行后續步驟:

      if (!stop && more && (first || continueProcessing(exchange, "so breaking out of pipeline", LOG)))

      可以看出,在如下三種情況下,后續步驟將不會被執行:

      1. 之前的步驟已經將exchange 對象標記為停止狀態。

      boolean stop = exchange.isRouteStop();

      2. 后續沒有步驟可執行。

      boolean more = num < size;

      3. continueProcessing()方法返回false。

      我們來看看continueProcessing()方法的代碼。

      public final class PipelineHelper {

      public static boolean continueProcessing(Exchange exchange, String message, Logger log) {

      ExtendedExchange ee = (ExtendedExchange) exchange;

      boolean stop = ee.isFailed() || ee.isRollbackOnly() || ee.isRollbackOnlyLast()

      || (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled());

      if (stop) {

      if (log.isDebugEnabled()) {

      StringBuilder sb = new StringBuilder();

      sb.append("Message exchange has failed: ").append(message).append(" for exchange: ").append(exchange);

      if (exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) {

      sb.append(" Marked as rollback only.");

      }

      if (exchange.getException() != null) {

      sb.append(" Exception: ").append(exchange.getException());

      }

      if (ee.isErrorHandlerHandledSet() && ee.isErrorHandlerHandled()) {

      sb.append(" Handled by the error handler.");

      }

      log.debug(sb.toString());

      }

      return false;

      }

      if (ee.isRouteStop()) {

      if (log.isDebugEnabled()) {

      log.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);

      }

      使用Apache Camel Multicast組件遇到的一個問題(使用apache部署web網站)

      return false;

      }

      return true;

      }

      }

      可以看出,當執行過程發生異常并且被異常處理器捕獲時,continueProcessing()方法將返回false。

      再回到我們的案例,第一個到達聚合器SimpleFlowMergeAggregator的子路徑("direct:A"),會作為后續聚合的基礎,其它子路徑("direct:B")會在此基礎上追加各自的body數據。實際上,很多Camel用戶都會采用這種方式來實現自定義聚合策略。但這樣做存在一個問題:在異常處理時,子路徑"direct:A"的exchange對象會被設置一個狀態標識,而此狀態標識會被傳遞到下游,用于判斷是否繼續執行后續步驟。由于作為聚合基礎的"direct:A"子路徑的exchange對象狀態為“異常”,最終continueProcessing()方法將返回false,后續的步驟也就不會再執行。

      5 解決方案

      對于上述問題,用戶可以使用多種方式來設置異常處理時exchange對象的狀態。本文采用如下解決方案:如果第一個子路徑執行正常,則繼續執行后續步驟;如果第一個子路徑執行異常,則將其與其它執行成功的子路徑交換,然后繼續執行后續步驟。

      更新后的自定義聚合器SimpleFlowMergeAggregator如下:

      public class SimpleFlowMergeAggregator implements AggregationStrategy {

      private static final Logger LOGGER = LoggerFactory.getLogger(SimpleFlowMergeAggregator.class.getName());

      @Override

      public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

      LOGGER.info("Inside aggregator " + newExchange.getIn().getBody());

      if(oldExchange == null) {

      String data = newExchange.getIn().getBody(String.class);

      List aggregatedDataList = new ArrayList<>();

      aggregatedDataList.add(data);

      newExchange.getIn().setBody(aggregatedDataList);

      return newExchange;

      }

      if(hadException(oldExchange)) {

      if(!hadException(newExchange)) {

      // aggregate and swap the base

      LOGGER.info("Found new exchange with success. swapping the base exchange");

      List oldData = oldExchange.getIn().getBody(List.class);

      oldData.add(newExchange.getIn().getBody(String.class));

      // swapped the base here

      newExchange.getIn().setBody(oldData);

      return newExchange;

      }

      }

      List oldData = oldExchange.getIn().getBody(List.class);

      oldData.add(newExchange.getIn().getBody(String.class));

      oldExchange.getIn().setBody(oldData);

      return oldExchange;

      }

      private boolean hadException(Exchange exchange) {

      if(exchange.isFailed()) {

      return true;

      }

      if(exchange.isRollbackOnly()) {

      return true;

      }

      if(exchange.isRollbackOnlyLast()) {

      return true;

      }

      if(((ExtendedExchange)exchange).isErrorHandlerHandledSet()

      && ((ExtendedExchange)exchange).isErrorHandlerHandled()) {

      return true;

      }

      return false;

      }

      }

      再次運行上述案例,日志信息如下:

      2021-05-06 12:46:19.122? INFO 2576 --- [qtp174245837-45] route1?????????????????????????????????? : received request

      2021-05-06 12:46:19.123? INFO 2576 --- [qtp174245837-45] route1?????????????????????????????????? : Entering multicast

      2021-05-06 12:46:19.130? INFO 2576 --- [ #3 - Multicast] route2?????????????????????????????????? : Executing PATH_1 - exception path

      2021-05-06 12:46:19.130? INFO 2576 --- [ #3 - Multicast] route2?????????????????????????????????? : Starting exception throw

      2021-05-06 12:46:19.134? INFO 2576 --- [ #3 - Multicast] route2?????????????????????????????????? : Exception handler invoked

      2021-05-06 12:46:19.135? INFO 2576 --- [ #3 - Multicast] c.e.d.m.SimpleFlowMergeAggregator??????? : Inside aggregator {"data" : "err"}

      2021-05-06 12:46:20.130? INFO 2576 --- [ #4 - Multicast] route3?????????????????????????????????? : Executing PATH_2 - success path

      2021-05-06 12:46:22.132? INFO 2576 --- [ #4 - Multicast] route3?????????????????????????????????? : PATH_2

      2021-05-06 12:46:22.132? INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator??????? : Inside aggregator DATA_FROM_PATH_2

      2021-05-06 12:46:22.132? INFO 2576 --- [ #4 - Multicast] c.e.d.m.SimpleFlowMergeAggregator??????? : Found new exchange with success. swapping the base exchange

      2021-05-06 12:46:22.133? INFO 2576 --- [ #4 - Multicast] route1?????????????????????????????????? : Aggregated results {"data" : "err"},DATA_FROM_PATH_2

      2021-05-06 12:46:22.133? INFO 2576 --- [ #4 - Multicast] route1?????????????????????????????????? : Another log

      可以看出,使用新的自定義聚合策略后,后續的日志(log)和轉換(transform)步驟都成功執行。

      6 結語

      本文通過案例,發現了一個Camel Multicast組件聚合策略相關的問題。通過查看Camel源代碼,找到了問題原因并給出了解決方案。

      希望本文可以幫助到遇到同樣問題的Camel用戶。

      版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。

      上一篇:excel表格間隔求和的方法教程詳解(excel表格如何間隔求和)
      下一篇:微認證之華為企業級JAVA編程規范(華為java開發規范)
      相關文章
      亚洲精品无码少妇30P| 亚洲最新黄色网址| 亚洲综合色区中文字幕| 亚洲AV人无码综合在线观看 | 亚洲欧洲精品无码AV| 国产午夜亚洲精品不卡免下载| 亚洲国产成人久久一区二区三区| 最新亚洲卡一卡二卡三新区| 亚洲依依成人亚洲社区| 亚洲人成小说网站色| 亚洲熟妇无码AV| 亚洲成AV人影片在线观看| 亚洲av无码一区二区三区四区| 亚洲aⅴ无码专区在线观看春色| 亚洲国产av玩弄放荡人妇| 国产成人久久精品亚洲小说| 国产精品亚洲а∨无码播放麻豆| 亚洲AV日韩AV一区二区三曲 | 亚洲AV无码乱码国产麻豆| 亚洲AV无码精品色午夜在线观看| 亚洲av永久无码精品国产精品| 久久久亚洲精品国产| 亚洲宅男永久在线| 亚洲同性男gay网站在线观看| 亚洲人成在线播放| 亚洲私人无码综合久久网| 亚洲成av人片在线天堂无| 国产亚洲男人的天堂在线观看| 亚洲色欲久久久久综合网| 亚洲熟女一区二区三区| 亚洲va在线va天堂va888www| 久久精品国产精品亚洲毛片| 亚洲国产日韩在线成人蜜芽| 亚洲综合av一区二区三区| 国产亚洲精品第一综合| 亚洲综合色自拍一区| 亚洲成a人片77777老司机| 亚洲国产精品免费在线观看| 亚洲色大18成人网站WWW在线播放| 国产成人精品久久亚洲高清不卡| 亚洲人成网站观看在线播放|