关于范例
Camel 有一个包含交换的工作单元概念。除其他外,该工作单元还支持同步回调,当交换完成时会调用这些回调。回调 API 定义于 org.apache.camel.spi.Synchronization
和扩展同步 org.apache.camel.spi.SynchronizationRouteAware
中,后者包含路由事件的回调。
完成时 DSL
OnCompletion EIP 支持以下功能:
-
级别:上下文或路由(路由级别高于全局级别) -
总是触发、仅在成功完成时触发或仅在失败时触发 -
onWhen
谓词仅在匹配时触发 -
mode
用于定义在路由消费者将响应写回给调用者(如果是 InOut)之前或之后运行(默认为 AfterConsumer)。 -
parallelProcessing
异步还是同步运行(是否使用线程池)(默认为 false)
onCompletion 支持以同步或异步模式(使用线程池)运行完成任务,也支持在路由消费者完成之前或之后运行。这样做是为了提供更大的灵活性。例如,指定在路由消费者完成之前同步运行,这样就可以在消费者向被调用者写回任何响应之前修改交换。例如,您可以利用这一点添加客户头,或发送到日志以记录响应信息等。
有路由范围的完成时
当原始 Exchange 完成时,OnCompletion EIP 允许您添加自定义路由/处理器。Camel 会分离出一个 Exchange 副本,并在一个单独的线程中对其进行路由,类似于 Wire Tap。这样,当 onCompletion 路由同时运行时,原始线程仍可继续。我们之所以选择这种模式,是因为我们不希望 onCompletion 路由干扰原始路由。
多次完成
您可以在上下文和路由级别定义多个 onCompletions。
如果定义了路由级别的 onCompletions,那么该路由的任何上下文级别都会被禁用。
from("direct:start")
.onCompletion()
// this route is only invoked when the original route is complete as a kind
// of completion callback
.to("log:sync")
.to("mock:sync")
// must use end to denote the end of the onCompletion route
.end()
// here the original route contiunes
.process(new MyProcessor())
.to("mock:result");
默认情况下,OnCompletion EIP 将在交换完成时触发,无论交换是成功完成还是失败(如抛出异常)。您可以将触发限制为仅发生 onCompleteOnly
或通过 onFailureOnly
触发,如下所示:
from("direct:start")
// here we qualify onCompletion to only invoke when the exchange failed (exception or FAULT body)
.onCompletion().onFailureOnly()
.to("log:sync")
.to("mock:syncFail")
// must use end to denote the end of the onCompletion route
.end()
.onCompletion().onCompleteOnly()
.to("log:sync")
.to("mock:syncOK")
.end()
// here the original route continues
.process(new MyProcessor())
.to("mock:result");
您可以通过 Camel 将添加布尔值为 true
的属性 Exchange.ON_COMPLETION
来识别 Exchange 是否为 OnCompletion Exchange。.
使用来自 xml dsl 的 oncompletion
onCompletion 是这样用 XML DSL 定义的:
<route>
<from uri="direct:start"/>
<!-- this onCompletion block will only be executed when the exchange is done being routed -->
<!-- this callback is always triggered even if the exchange failed -->
<onCompletion>
<!-- so this is a kinda like an after completion callback -->
<to uri="log:sync"/>
<to uri="mock:sync"/>
</onCompletion>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>
而 onCompleteOnly
和 onFailureOnly
被定义为 <onCompletion>
标签上的布尔属性,因此失败示例为
<route>
<from uri="direct:start"/>
<!-- this onCompletion block will only be executed when the exchange is done being routed -->
<!-- this callback is only triggered when the exchange failed, as we have onFailureOnly=true -->
<onCompletion onFailureOnly="true">
<to uri="log:sync"/>
<to uri="mock:sync"/>
</onCompletion>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>
在全球一级完成
其工作原理与路由级别相同,只是它们是全局定义的。下面是一个例子:
// define a global on completion that is invoked when the exchange is done being routed
onCompletion().to("log:global").to("mock:sync");
from("direct:start")
.process(new MyProcessor())
.to("mock:result");
还有 XML:
<!-- this is a global onCompletion route that is invoked when any exchange is done being routed
as a kind of after callback -->
<onCompletion>
<to uri="log:global"/>
<to uri="mock:sync"/>
</onCompletion>
<route>
<from uri="direct:start"/>
<process ref="myProcessor"/>
<to uri="mock:result"/>
</route>
如果在路由中定义了 onCompletion ,它会覆盖所有全局作用域,因此只使用路由作用域。全局作用域不会被使用。 |
使用带 onwhen 谓词的 oncompletion
与 Camel 的其他 DSL 一样,您可以为 onCompletion
附加谓词。,因此只有在谓词匹配的特定条件下才会触发。例如,要只在邮件正文包含单词 "Hello "时触发,我们可以这样做
from("direct:start")
.onCompletion().onWhen(body().contains("Hello"))
// this route is only invoked when the original route is done being routed
// and the onWhen predicate is true
.to("log:sync")
.to("mock:sync")
// must use end to denote the end of the onCompletion route
.end()
// here the original route continues
.to("log:original")
.to("mock:result");
在有线程池或没有线程池的情况下使用 oncompletion
若要使用线程池,请设置 executorService
或将 parallelProcessing
设为 true。
例如,在 Java DSL 中
onCompletion().parallelProcessing()
.to("mock:before")
.delay(1000)
.setBody(simple("OnComplete:${body}"));
以及 XML DSL:
<onCompletion parallelProcessing="true">
<to uri="mock:before"/>
<delay><constant>1000</constant></delay>
<setBody><simple>OnComplete:${body}</simple></setBody>
</onCompletion>
您也可以使用 executorServiceRef
选项,指定要使用的特定线程池
<onCompletion executorServiceRef="myThreadPool">
<to uri="mock:before"/>
<delay><constant>1000</constant></delay>
<setBody><simple>OnComplete:${body}</simple></setBody>
</onCompletion>
完成时消费模式
OnCompletion 支持两种影响路由消费者的模式:
-
AfterConsumer - 消费完成后运行的默认模式 -
BeforeConsumer - 在消费者完成之前运行,并且在消费者向被调用者写回响应之前运行
AfterConsumer 模式是默认模式,与 Camel 旧版本中的行为相同。
新的 "消费者之前"(BeforeConsumer)模式用于在消费者将其响应写回给被调用者之前运行 onCompletion
(如果采用 "输入输出 "模式)。这样, onCompletion
就可以修改 Exchange,例如添加特殊标头,或将 Exchange 记录为响应记录器等。
例如,要始终添加 "创建者 "标题,可以使用 modeBeforeConsumer()
,如下所示:
.onCompletion().modeBeforeConsumer()
.setHeader("createdBy", constant("Someone"))
.end()
在 XML DSL 中,您可以将模式属性设置为 BeforeConsumer:
<onCompletion mode="BeforeConsumer">
<setHeader name="createdBy">
<constant>Someone</constant>
</setHeader>
</onCompletion>