I had a case to implement with
Apache Camel, where the application reads XML files produced by an external system, imports the data into Oracle database, and in a while, it should process the data which has been reviewed by some business person.
Here's the simplified main code implementing the process:
package my.app;
import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.spring.SpringRouteBuilder;
public class MyRouteBuilder extends SpringRouteBuilder {
protected void configureImportRoute() {
String filesUri = "file:files/payments" +
"?initialDelay=3000" +
"&delay=1000" +
"&useFixedDelay=true" +
"&include=.*[.]xml" +
"&move=backup/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml" +
"&moveFailed=files/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml.error";
from(filesUri).convertBodyTo(MyBean.class).transacted().to("importProcessor");
String executionTriggerUri = "timer:executionTimer"
+ "?fixedRate=true"
+ "&daemon=true"
+ "&delay=3000"
+ "&period=3000";
from(executionTriggerUri)
.pipeline("bean:myDao?method=listItemsForExecution")
.to("executioncProcessor");
}
What I wanted to do is to synchronize the routes. There could be
several ways for doing this: using Camel BAM module, using a polling consumer and checking a flag. I came up with a solution that logically looks almost the same as
Claus proposed, but instead of some explicit flag I used thread synchronization trick. Here's how the proof of concept code looks like.
The route builder:
package my.dummy.app;
import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.spring.SpringRouteBuilder;
public class Route extends SpringRouteBuilder {
public void configure() {
from("timer:TriggerA?delay=100&period=1").to("A");
from("timer:TriggerB?delay=100&period=1").to("B");
from("timer:TriggerC?delay=100&period=1").to("C");
}
}
Here we can see 3 concurrent routes being executed via timer component periodically.
I created 3 dummy processors that are emulating the business logic processors in the real application. Its function is just to create a delay, using Thread.sleep() with random period, and logs a debug message.
package my.dummy.app;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.log4j.Logger;
import java.util.Random;
public class A /*B*/ /*C*/ implements Processor {
Random r = new Random();
private static final Logger log = Logger.getLogger(Processor.class);
public void process(Exchange exchange) throws Exception {
Thread.sleep(r.nextInt(1000));
log.info("processing " + exchange + " in " + getClass().getName());
}
}
The Spring configuration for this dummy application is as follows:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemalocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelcontext id="importing" xmlns="http://camel.apache.org/schema/spring">
<packagescan>
<package>my.dummy.app</package>
</packagescan>
</camelcontext>
<bean class="my.dummy.app.A" id="A">
<bean class="my.dummy.app.B" id="B">
<bean class="my.dummy.app.C" id="C">
</beans>
I noticed that there's a DelegateProcessor in Camel that could be used to wrap the real processors. So I can use it to synchronize the routes like this:
package my.dummy.app;
import org.apache.log4j.Logger;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.DelegateProcessor;
public class RouteSynchronizer extends DelegateProcessor {
private Logger log = Logger.getLogger(RouteSynchronizer.class);
private final static Object sync = new Object();
public void process(Exchange exchange) throws Exception {
synchronized (sync) {
log.debug("begin exchange processing by " + Thread.currentThread().getName());
super.process(exchange);
try {
if (exchange.isFailed()) {
throw new RuntimeCamelException(exchange.getException());
}
} finally {
log.debug("end exchange processing by " + Thread.currentThread().getName());
}
}
}
}
Now I can wrap my original processor beans my the brand new delegate processor as follows:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemalocation="</p><p>http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd</p><p>http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelcontext id="importing" xmlns="http://camel.apache.org/schema/spring">
<packagescan>
<package>my.dummy.app</package>
</packagescan>
<bean class="my.dummy.app.RouteSynchronizer" id="A"/>
<property name="processor"/>
</bean>
<bean class="my.dummy.app.A"/>
<bean class="my.dummy.app.RouteSynchronizer" id="B"/>
<bean class="my.dummy.app.B"/>
<bean class="my.dummy.app.RouteSynchronizer" id="C"/>
<bean class="my.dymmy.app.C"/>
<camelcontext>
</beans>
With this solution there's a performance penalty due to synchronization but this is not critical for my application at least and confirms to the requirements.
Here's how the dummy application log looks like:
12-08-2009 15:06:01,970 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:02,298 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:02,313 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:02,313 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:03,032 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C
12-08-2009 15:06:03,032 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:03,032 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:03,173 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A
12-08-2009 15:06:03,173 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:03,173 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:03,579 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:03,579 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:03,579 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:04,251 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C
12-08-2009 15:06:04,251 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:04,251 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:04,626 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A
12-08-2009 15:06:04,626 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:04,626 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:05,251 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:05,251 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:05,251 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:05,688 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C
12-08-2009 15:06:05,688 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:05,688 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:05,782 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A
12-08-2009 15:06:05,782 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:05,782 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:06,298 INFO Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:06,298 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
From the log we can see that this concept actually works - no crossing between the routes appeared.
What could be nice to have is the same support in the DSL, like:
from("...").synchronize().to("processorA");
from("...").synchronize().to("processorA");