这是用户在 2024-3-28 21:43 为 https://camel.apache.org/manual/oncompletion.html 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?

 关于范例


Camel 有一个包含交换的工作单元概念。除其他外,该工作单元还支持同步回调,当交换完成时会调用这些回调。回调 API 定义于 org.apache.camel.spi.Synchronization 和扩展同步 org.apache.camel.spi.SynchronizationRouteAware 中,后者包含路由事件的回调。

 工作单元 API


您可以使用 getUnitOfWork() 方法从 org.apache.camel.Exchange 获取 org.apache.camel.spi.UnitOfWork 。.

 完成时 DSL


OnCompletion EIP 支持以下功能:


  • 级别:上下文或路由(路由级别高于全局级别)


  • 总是触发、仅在成功完成时触发或仅在失败时触发


  • onWhen 谓词仅在匹配时触发


  • mode 用于定义在路由消费者将响应写回给调用者(如果是 InOut)之前或之后运行(默认为 AfterConsumer)。


  • parallelProcessing 异步还是同步运行(是否使用线程池)(默认为 false)


onCompletion 支持以同步或异步模式(使用线程池)运行完成任务,也支持在路由消费者完成之前或之后运行。这样做是为了提供更大的灵活性。例如,指定在路由消费者完成之前同步运行,这样就可以在消费者向被调用者写回任何响应之前修改交换。例如,您可以利用这一点添加客户头,或发送到日志以记录响应信息等。


有路由范围的完成时


当原始 Exchange 完成时,OnCompletion EIP 允许您添加自定义路由/处理器。Camel 会分离出一个 Exchange 副本,并在一个单独的线程中对其进行路由,类似于 Wire Tap。这样,当 onCompletion 路由同时运行时,原始线程仍可继续。我们之所以选择这种模式,是因为我们不希望 onCompletion 路由干扰原始路由。

 多次完成


您可以在上下文和路由级别定义多个 onCompletions。


如果定义了路由级别的 onCompletions,那么该路由的任何上下文级别都会被禁用。

from("direct:start")
    .onCompletion()
        // this route is only invoked when the original route is complete as a kind
        // of completion callback
        .to("log:sync")
        .to("mock:sync")
    // must use end to denote the end of the onCompletion route
    .end()
    // here the original route contiunes
    .process(new MyProcessor())
    .to("mock:result");


默认情况下,OnCompletion EIP 将在交换完成时触发,无论交换是成功完成还是失败(如抛出异常)。您可以将触发限制为仅发生 onCompleteOnly 或通过 onFailureOnly 触发,如下所示:

from("direct:start")
    // here we qualify onCompletion to only invoke when the exchange failed (exception or FAULT body)
    .onCompletion().onFailureOnly()
        .to("log:sync")
        .to("mock:syncFail")
    // must use end to denote the end of the onCompletion route
    .end()
    .onCompletion().onCompleteOnly()
        .to("log:sync")
        .to("mock:syncOK")
    .end()
    // here the original route continues
    .process(new MyProcessor())
    .to("mock:result");


您可以通过 Camel 将添加布尔值为 true 的属性 Exchange.ON_COMPLETION 来识别 Exchange 是否为 OnCompletion Exchange。.


使用来自 xml dsl 的 oncompletion


onCompletion 是这样用 XML DSL 定义的:

<route>
    <from uri="direct:start"/>
    <!-- this onCompletion block will only be executed when the exchange is done being routed -->
    <!-- this callback is always triggered even if the exchange failed -->
    <onCompletion>
        <!-- so this is a kinda like an after completion callback -->
        <to uri="log:sync"/>
        <to uri="mock:sync"/>
    </onCompletion>
    <process ref="myProcessor"/>
    <to uri="mock:result"/>
</route>


onCompleteOnlyonFailureOnly 被定义为 <onCompletion> 标签上的布尔属性,因此失败示例为

<route>
    <from uri="direct:start"/>
    <!-- this onCompletion block will only be executed when the exchange is done being routed -->
    <!-- this callback is only triggered when the exchange failed, as we have onFailureOnly=true -->
    <onCompletion onFailureOnly="true">
        <to uri="log:sync"/>
        <to uri="mock:sync"/>
    </onCompletion>
    <process ref="myProcessor"/>
    <to uri="mock:result"/>
</route>


在全球一级完成


其工作原理与路由级别相同,只是它们是全局定义的。下面是一个例子:

// define a global on completion that is invoked when the exchange is done being routed
onCompletion().to("log:global").to("mock:sync");

from("direct:start")
    .process(new MyProcessor())
    .to("mock:result");

 还有 XML:

<!-- this is a global onCompletion route that is invoked when any exchange is done being routed
     as a kind of after callback -->
<onCompletion>
    <to uri="log:global"/>
    <to uri="mock:sync"/>
</onCompletion>

<route>
    <from uri="direct:start"/>
    <process ref="myProcessor"/>
    <to uri="mock:result"/>
</route>

如果在路由中定义了 onCompletion ,它会覆盖所有全局作用域,因此只使用路由作用域。全局作用域不会被使用。


使用带 onwhen 谓词的 oncompletion


与 Camel 的其他 DSL 一样,您可以为 onCompletion 附加谓词。,因此只有在谓词匹配的特定条件下才会触发。例如,要只在邮件正文包含单词 "Hello "时触发,我们可以这样做

from("direct:start")
    .onCompletion().onWhen(body().contains("Hello"))
        // this route is only invoked when the original route is done being routed
        // and the onWhen predicate is true
        .to("log:sync")
        .to("mock:sync")
    // must use end to denote the end of the onCompletion route
    .end()
    // here the original route continues
    .to("log:original")
    .to("mock:result");


在有线程池或没有线程池的情况下使用 oncompletion


若要使用线程池,请设置 executorService 或将 parallelProcessing 设为 true。


例如,在 Java DSL 中

onCompletion().parallelProcessing()
    .to("mock:before")
    .delay(1000)
    .setBody(simple("OnComplete:${body}"));


以及 XML DSL:

<onCompletion parallelProcessing="true">
  <to uri="mock:before"/>
  <delay><constant>1000</constant></delay>
  <setBody><simple>OnComplete:${body}</simple></setBody>
</onCompletion>


您也可以使用 executorServiceRef 选项,指定要使用的特定线程池

<onCompletion executorServiceRef="myThreadPool">
  <to uri="mock:before"/>
  <delay><constant>1000</constant></delay>
  <setBody><simple>OnComplete:${body}</simple></setBody>
</onCompletion>


完成时消费模式


OnCompletion 支持两种影响路由消费者的模式:


  • AfterConsumer - 消费完成后运行的默认模式


  • BeforeConsumer - 在消费者完成之前运行,并且在消费者向被调用者写回响应之前运行


AfterConsumer 模式是默认模式,与 Camel 旧版本中的行为相同。


新的 "消费者之前"(BeforeConsumer)模式用于在消费者将其响应写回给被调用者之前运行 onCompletion (如果采用 "输入输出 "模式)。这样, onCompletion 就可以修改 Exchange,例如添加特殊标头,或将 Exchange 记录为响应记录器等。


例如,要始终添加 "创建者 "标题,可以使用 modeBeforeConsumer() ,如下所示:

.onCompletion().modeBeforeConsumer()
    .setHeader("createdBy", constant("Someone"))
.end()


在 XML DSL 中,您可以将模式属性设置为 BeforeConsumer:

<onCompletion mode="BeforeConsumer">
  <setHeader name="createdBy">
    <constant>Someone</constant>
  </setHeader>
</onCompletion>