Pages

Wednesday, August 12, 2009

Apache Camel: Route Synchronization

I had a case to implement with Apache Camel, where the application reads XML files produced by an external system, imports the data into Oracle database, and in a while, it should process the data which has been reviewed by some business person.



Here's the simplified main code implementing the process:

package my.app;

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.spring.SpringRouteBuilder;

public class MyRouteBuilder extends SpringRouteBuilder {

 protected void configureImportRoute() {
   String filesUri = "file:files/payments" + 
                     "?initialDelay=3000" + 
                     "&delay=1000" + 
                     "&useFixedDelay=true" +
                     "&include=.*[.]xml" +
                     "&move=backup/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml" +
                     "&moveFailed=files/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml.error";

   from(filesUri).convertBodyTo(MyBean.class).transacted().to("importProcessor");

   String executionTriggerUri = "timer:executionTimer"
                              + "?fixedRate=true"
                              + "&daemon=true"
                              + "&delay=3000"
                              + "&period=3000";

   from(executionTriggerUri)
    .pipeline("bean:myDao?method=listItemsForExecution")
    .to("executioncProcessor");
}


What I wanted to do is to synchronize the routes. There could be several ways for doing this: using Camel BAM module, using a polling consumer and checking a flag. I came up with a solution that logically looks almost the same as Claus proposed, but instead of some explicit flag I used thread synchronization trick. Here's how the proof of concept code looks like.

The route builder:
package my.dummy.app;

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.spring.SpringRouteBuilder;

public class Route extends SpringRouteBuilder {
  public void configure() {
    from("timer:TriggerA?delay=100&period=1").to("A");
    from("timer:TriggerB?delay=100&period=1").to("B");
    from("timer:TriggerC?delay=100&period=1").to("C");
  }
}


Here we can see 3 concurrent routes being executed via timer component periodically.

I created 3 dummy processors that are emulating the business logic processors in the real application. Its function is just to create a delay, using Thread.sleep() with random period, and logs a debug message.

package my.dummy.app;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.log4j.Logger;
import java.util.Random;

public class A /*B*/ /*C*/ implements Processor {
  Random r = new Random();
  private static final Logger log = Logger.getLogger(Processor.class);
  public void process(Exchange exchange) throws Exception {
    Thread.sleep(r.nextInt(1000));
    log.info("processing " + exchange + " in " + getClass().getName());
  }
}


The Spring configuration for this dummy application is as follows:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemalocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

<camelcontext id="importing" xmlns="http://camel.apache.org/schema/spring">
  <packagescan>
     <package>my.dummy.app</package>
  </packagescan>
</camelcontext>

<bean class="my.dummy.app.A" id="A">
<bean class="my.dummy.app.B" id="B">
<bean class="my.dummy.app.C" id="C">
</beans>
I noticed that there's a DelegateProcessor in Camel that could be used to wrap the real processors. So I can use it to synchronize the routes like this:

package my.dummy.app;

import org.apache.log4j.Logger;

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.DelegateProcessor;

public class RouteSynchronizer extends DelegateProcessor {
private Logger log = Logger.getLogger(RouteSynchronizer.class);
private final static Object sync = new Object();

public void process(Exchange exchange) throws Exception {
  synchronized (sync) {
    log.debug("begin exchange processing by " + Thread.currentThread().getName());
    super.process(exchange);
    try {
      if (exchange.isFailed()) {
        throw new RuntimeCamelException(exchange.getException());
      }
    } finally {
      log.debug("end exchange processing by " + Thread.currentThread().getName());
    }
  }
}

}


Now I can wrap my original processor beans my the brand new delegate processor as follows:


<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemalocation="</p><p>http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd</p><p>http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

<camelcontext id="importing" xmlns="http://camel.apache.org/schema/spring">
<packagescan>
<package>my.dummy.app</package>
</packagescan>


<bean class="my.dummy.app.RouteSynchronizer" id="A"/>
<property name="processor"/>
</bean>

<bean class="my.dummy.app.A"/>
<bean class="my.dummy.app.RouteSynchronizer" id="B"/>
<bean class="my.dummy.app.B"/>
<bean class="my.dummy.app.RouteSynchronizer" id="C"/>
<bean class="my.dymmy.app.C"/>

<camelcontext>
</beans>

With this solution there's a performance penalty due to synchronization but this is not critical for my application at least and confirms to the requirements.

Here's how the dummy application log looks like:
12-08-2009 15:06:01,970 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:02,298 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:02,313 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:02,313 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:03,032 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C
12-08-2009 15:06:03,032 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:03,032 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:03,173 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A
12-08-2009 15:06:03,173 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:03,173 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:03,579 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:03,579 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:03,579 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:04,251 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C
12-08-2009 15:06:04,251 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:04,251 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:04,626 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A
12-08-2009 15:06:04,626 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:04,626 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:05,251 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:05,251 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:05,251 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:05,688 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C
12-08-2009 15:06:05,688 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:05,688 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:05,782 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A
12-08-2009 15:06:05,782 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:05,782 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:06,298 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:06,298 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1


From the log we can see that this concept actually works - no crossing between the routes appeared.

What could be nice to have is the same support in the DSL, like:
from("...").synchronize().to("processorA");
from("...").synchronize().to("processorA");

Apache Camel: The File Componenet

A very common task for so-called batch processes, is to read some files in some directory, so some processing for these files (whereas it might be required to do some data instrumentation with the data from various sources), and store the data in the database.

Here's a file source endpoint definition:

String uri = "file:files" +
"?initialDelay=3000" +
"&delay=1000" +
"&useFixedDelay=true" +
"&include=.*[.]xml" +
"&move=backup/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml" +
"&moveFailed=files/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml.error";

So consider this route:

from(uri).process(someProcessor).to("someBean");

With file endpoint definition above, it means, that with consume the files from some directory called "files", the initial delay for the files polling is 3 seconds, and we will poll with fixed intervals - 1 second, filtering out non-xml files.

In addition to that, if a file is processed successfully, it is being moved to a "backup" directory appending a time stamp in the file's name.

A new feature is a moveFailure attribute, which in case of failure in someProcessor or in the target endpoint ("someBean") allows you to handle the failed files, e.g. renaming or moving to another directory. At the time of writing the feature is available in snapshot version of Camel.

Disqus for Code Impossible