Here's the simplified main code implementing the process:
package my.app; import org.apache.camel.Processor; import org.apache.camel.Exchange; import org.apache.camel.spring.SpringRouteBuilder; public class MyRouteBuilder extends SpringRouteBuilder { protected void configureImportRoute() { String filesUri = "file:files/payments" + "?initialDelay=3000" + "&delay=1000" + "&useFixedDelay=true" + "&include=.*[.]xml" + "&move=backup/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml" + "&moveFailed=files/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml.error"; from(filesUri).convertBodyTo(MyBean.class).transacted().to("importProcessor"); String executionTriggerUri = "timer:executionTimer" + "?fixedRate=true" + "&daemon=true" + "&delay=3000" + "&period=3000"; from(executionTriggerUri) .pipeline("bean:myDao?method=listItemsForExecution") .to("executioncProcessor"); }
What I wanted to do is to synchronize the routes. There could be several ways for doing this: using Camel BAM module, using a polling consumer and checking a flag. I came up with a solution that logically looks almost the same as Claus proposed, but instead of some explicit flag I used thread synchronization trick. Here's how the proof of concept code looks like.
The route builder:
package my.dummy.app; import org.apache.camel.Processor; import org.apache.camel.Exchange; import org.apache.camel.spring.SpringRouteBuilder; public class Route extends SpringRouteBuilder { public void configure() { from("timer:TriggerA?delay=100&period=1").to("A"); from("timer:TriggerB?delay=100&period=1").to("B"); from("timer:TriggerC?delay=100&period=1").to("C"); } }
Here we can see 3 concurrent routes being executed via timer component periodically.
I created 3 dummy processors that are emulating the business logic processors in the real application. Its function is just to create a delay, using Thread.sleep() with random period, and logs a debug message.
package my.dummy.app; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.log4j.Logger; import java.util.Random; public class A /*B*/ /*C*/ implements Processor { Random r = new Random(); private static final Logger log = Logger.getLogger(Processor.class); public void process(Exchange exchange) throws Exception { Thread.sleep(r.nextInt(1000)); log.info("processing " + exchange + " in " + getClass().getName()); } }
The Spring configuration for this dummy application is as follows:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemalocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <camelcontext id="importing" xmlns="http://camel.apache.org/schema/spring"> <packagescan> <package>my.dummy.app</package> </packagescan> </camelcontext> <bean class="my.dummy.app.A" id="A"> <bean class="my.dummy.app.B" id="B"> <bean class="my.dummy.app.C" id="C"> </beans>I noticed that there's a DelegateProcessor in Camel that could be used to wrap the real processors. So I can use it to synchronize the routes like this:
package my.dummy.app; import org.apache.log4j.Logger; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.processor.DelegateProcessor; public class RouteSynchronizer extends DelegateProcessor { private Logger log = Logger.getLogger(RouteSynchronizer.class); private final static Object sync = new Object(); public void process(Exchange exchange) throws Exception { synchronized (sync) { log.debug("begin exchange processing by " + Thread.currentThread().getName()); super.process(exchange); try { if (exchange.isFailed()) { throw new RuntimeCamelException(exchange.getException()); } } finally { log.debug("end exchange processing by " + Thread.currentThread().getName()); } } } }
Now I can wrap my original processor beans my the brand new delegate processor as follows:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemalocation="</p><p>http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd</p><p>http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelcontext id="importing" xmlns="http://camel.apache.org/schema/spring">
<packagescan>
<package>my.dummy.app</package>
</packagescan>
<bean class="my.dummy.app.RouteSynchronizer" id="A"/>
<property name="processor"/>
</bean>
<bean class="my.dummy.app.A"/>
<bean class="my.dummy.app.RouteSynchronizer" id="B"/>
<bean class="my.dummy.app.B"/>
<bean class="my.dummy.app.RouteSynchronizer" id="C"/>
<bean class="my.dymmy.app.C"/>
<camelcontext>
</beans>
With this solution there's a performance penalty due to synchronization but this is not critical for my application at least and confirms to the requirements.
Here's how the dummy application log looks like:
12-08-2009 15:06:01,970 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1 12-08-2009 15:06:02,298 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B 12-08-2009 15:06:02,313 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1 12-08-2009 15:06:02,313 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1 12-08-2009 15:06:03,032 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C 12-08-2009 15:06:03,032 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1 12-08-2009 15:06:03,032 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1 12-08-2009 15:06:03,173 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A 12-08-2009 15:06:03,173 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1 12-08-2009 15:06:03,173 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1 12-08-2009 15:06:03,579 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B 12-08-2009 15:06:03,579 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1 12-08-2009 15:06:03,579 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1 12-08-2009 15:06:04,251 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C 12-08-2009 15:06:04,251 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1 12-08-2009 15:06:04,251 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1 12-08-2009 15:06:04,626 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A 12-08-2009 15:06:04,626 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1 12-08-2009 15:06:04,626 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1 12-08-2009 15:06:05,251 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B 12-08-2009 15:06:05,251 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1 12-08-2009 15:06:05,251 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1 12-08-2009 15:06:05,688 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C 12-08-2009 15:06:05,688 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1 12-08-2009 15:06:05,688 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1 12-08-2009 15:06:05,782 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A 12-08-2009 15:06:05,782 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1 12-08-2009 15:06:05,782 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1 12-08-2009 15:06:06,298 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B 12-08-2009 15:06:06,298 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
From the log we can see that this concept actually works - no crossing between the routes appeared.
What could be nice to have is the same support in the DSL, like:
from("...").synchronize().to("processorA"); from("...").synchronize().to("processorA");
1 comment:
Nice blog post.
The .threads(1) should do this as well as the JDK executor will only have 1 thread in its poll.
But since it also have the affect of using a JDK task to route (eg = using another Thread) the processor approach you do can ensure using same current thread if needed.
I think you can use a Barrier in the JDK to indicate how many concurrent exchanges you want.
Maybe something we need for the Camel core as well? Gonna think a bit about this. But feel free to raise a ticket in JIRA so we wont forget.
Post a Comment