Monday, November 28, 2016

Real-Time Data Batching with Apache Camel

Over the past few years, I've been working with high volume real-time message based applications.  As we scale these systems out, resource usage becomes increasingly important and various trade-offs have to be considered.

From a storage perspective, many of the NoSQL techs claim to handle mass volumes of parallel requests and data with ease.  In the real world, however, most techs have similar system resource limitations (threading, CPU, memory, I/O, etc) that all contribute to latency/stability issues.  Though many symptoms can be masked by horizontal the end of the day, the more efficiently your applications use resources, the better things scale out...

One approach is to batch process data to more efficiently use resources (threads/connections) by reducing per message/request overhead.  This is a widely used pattern for at-rest/bulk data processing, but not often considered for real-time/event driven processing.  As it turns out, this pattern is often a best practice (or even required) for reasonable performance from external systems and storage techs when working with high volume data.

I first encountered this use case when storing high volume messaging in Oracle.  The issue is that transaction commits are expensive and committing a single message at a time simply doesn't scale. Instead, we switched to aggregating messages in memory and then passed these to Oracle via an array.  The net result was dramatically reduced load against the database, negligible delays in processing data and a theoretical increased chance of message loss...overall, this addressed our performance issues with relatively little impact.

Then, as we moved into NoSQL solutions (Cassandra, HDFS, ElasticSearch, etc), I revisited the need for was obvious that the same batching strategy still applied to these new techs as well...

Luckily, we are working with Apache Camel and its implementation of EIPs (Enterprise Integration Patterns) make solving these types of issues fairly straight forward.  In particular, the split and aggregator patterns are designed for just this type of message flow.

For example, let's say I have 3 systems that process messages...A, B, C.  System A produces messages and sends them to system B.  system B does some processing and then sends them to system C for final processing.

in Camel, this is expressed by the following simple route (assuming ActiveMQ is used as a message bus in between systems)

now, if we find that system B requires batch sizes of 100.  I can easily batch these together using a simple aggregator as follows...

Given the config above, we'll pass on a batch after 100 messages are aggregated (completionSize) OR after 1000ms (completionTimeout)...the latter is key to limiting the delay in processing when volume is low.  also note, the above would pass on a batch size of 100 to system C...
Now, let's assume system C prefers a batch size of 10 and must get groups for the same are the changes required

notice that in the system C route, I now call split(body()) which will split the incoming batch from system B and allow us to re-aggregate per the requirements for this system.  Also, the header("accountId") in place of constant(true) tells the aggregator to build batches/groups for similar accountId values only and still complete after 10 are received or after 1000ms...

As you can see, these are very flexible and easy to implement patterns for message flow.  That said, you do have to give some consideration to memory/CPU usage and overall message reliability requirements when using these.

Overall, I've used this pattern to successfully scale out high volume requests to Oracle, Cassandra, HDFS and ElasticSearch...

Wednesday, June 18, 2014

ApacheCon 2014 Presentation - Apache Camel Business Patterns

A few months back I had the opportunity to present at ApacheCon in Denver, CO.  While the conference was small, I had the chance to meet some interesting people in the community and learn about some other Apache projects.

My presentation was an introduction to how to use Camel to address common business problems (common to my experience anyways).

Here is a link to my presentation slides and the audio, feedback is welcome...

Tuesday, November 8, 2011

Creating A Custom Camel Component

While Camel supports an ever growing number of components, you might have a need to create a custom component. This could be to either promote reuse across projects, customize an existing component or provide a simplified interface to an existing system. Whatever the reason, here is an overview of the options that are available within the Camel framework...

first, consider just creating a Bean or Processor

Before you jump in and create a component, consider just creating a simple class to handle your custom logic. Behind the scenes, all components are just Processors with a bunch of lifecycle support around them.  Beans and Processors are simple, streamlined and easy to manage.

using a Bean...

public class MyBean {
    public void doSomething(Exchange exchange) {
      //do something...

using a Processor...

from(uri).process(new MyProcessor());
public class MyProcessor implements Processor {
    public void process(Exchange exchange) throws Exception {
        //do something... 

create a custom component

If you decide to go down this route, you should start by start by using a Maven archetype to stub out a new component project for you.

mvn archetype:generate
This will create a new Maven component project that contains an example HelloWorld component as seen here...

The following core classes are created and have the following responsibilities:

  • HelloWorldComponent
    • endpoint factory which implements createEndpoint()
  • HelloWorldEndpoint
    • producer/consumer factory which implements createConsumer(), createProducer(), createExchange()
  • HelloWorldConsumer
    • acts as a service to consumes request at the start of a route
  • HelloWorldProducer
    • acts as a service consumer to dispatch outgoing requests and receive incoming replies
  • Exchange
    • encapsulate the in/out message payloads and meta data about the data flowing between endpoints
  • Message
    • represent the message payload
    • their is an IN and OUT message for each exchange 
So, how do all these classes/method actually work?  The best way to get your head around this is to load the project into Eclipse (or IntelliJ) and debug the unit test.  This will allow you to step into the route initialization and message processing to trace the flow.

Consumer Lifecycle

When you define a route that uses your new component as a consumer, like this

It does the following:
  • creates a HelloWorldComponent instance (one per CamelContext)
  • calls HelloWorldComponent createEndpoint() with the given URI
  • creates a HelloWorldEndpoint instance (one per route reference)
  • creates a HelloWorldConsumer instance (one per route reference)
  • register the route with the CamelContext and call doStart() on the Consumer
  • consumers will then start in one of the following modes:
    • event driven - wait for message to trigger route
    • polling consumer - manually polls a resource for events
    • scheduled polling consumer - events automatically generated by timer
    • custom threading - custom management of the event lifecyle

Producer Lifecycle

When you define a route that uses your new component as a producer, like this
It does the following:
  • creates a HelloWorldComponent instance (one per CamelContext)
  • calls HelloWorldComponent createEndpoint() with the given URI
  • creates a HelloWorldEndpoint instance (one per route reference)
  • creates a HelloWorldProducer instance (one per route reference)
  • register the route with the CamelContext and start the route consumer
  • the Producer's process(Exchange) method is then executed
    • generally, this will decorate the Exchange by interfacing with some external resource (file, jms, database, etc)
Other Resources

Friday, August 19, 2011

Performance Monitoring With Spring AOP

If you are using Spring to access/configure resources (DAOs/services), then you might as well add some basic performance monitoring while you are at it.  This is a trivial task with Spring AOP and doesn't require any changes to existing code, just some simple configuration. 

First, you need to include the spring-aop, aspectj and cglib libraries.  If you are using Maven, simply include the following dependencies...


Next, identify what needs monitoring and put the AOP hooks in place.  Generally, this just requires adding a pointcut and advisor configuration in your existing Spring XML configuration file.  This configuration will add method response time logging to all methods in the "" package.  Note: these classes must be instantiated with the Spring context...otherwise, the AOP hooks will not be executed.

    <bean id="performanceMonitor"
              class="org.springframework.aop.interceptor.PerformanceMonitorInterceptor" />

        <aop:pointcut id="allServiceMethods" expression="execution(**.*(..))"/>
        <aop:advisor pointcut-ref="allServiceMethods" advice-ref="performanceMonitor" order="2"/>

Next, you need to setup your logging (log4j, etc) to enable TRACE on the interceptor class.

    <logger name="org.springframework.aop.interceptor.PerformanceMonitorInterceptor" additivity="false">
        <level value="TRACE"/>
        <appender-ref ref="STDOUT"/>

That's it, now when you run your application, you will see the following logging...

TRACE PerformanceMonitorInterceptor  - StopWatch 'PerfTestService.processRequest': running time (millis) = 1322
TRACE PerformanceMonitorInterceptor  - StopWatch 'PerfTestService.processRequest': running time (millis) = 98
TRACE PerformanceMonitorInterceptor  - StopWatch 'PerfTestService.processRequest': running time (millis) = 1764

This is a some great raw data, but unfortunately is not very useful on its own.  Its for every method call and doesn't provide any other stats.  This quickly clutters up the log and without some way to process/aggregate the log entries, its hard to make sense out of it.  So, unless you plan of writing some log parsing or using 3rd party software (like Splunk or Cati), then you really should do some processing of the data before writing it to the log file.

One easy way to do this is to just write a simple interceptor class to use instead of the Spring default one (PerformanceMonitorInterceptor).  Below is an example of this that provides periodic stats (last, average and greatest response time) as well as warning whenever a method response time exceeds a configured threshold.

By default, it will log stats every 10 method calls and log a warning message anytime a method response time exceeds 1000ms.

public class PerfInterceptor implements MethodInterceptor {

     Logger logger = LoggerFactory.getLogger(PerfInterceptor.class.getName());
    private static ConcurrentHashMap<String, MethodStats> methodStats = new ConcurrentHashMap<String, MethodStats>();
    private static long statLogFrequency = 10;
    private static long methodWarningThreshold = 1000;
    public Object invoke(MethodInvocation method) throws Throwable {
        long start = System.currentTimeMillis();
        try {
            return method.proceed();
        finally {
            updateStats(method.getMethod().getName(),(System.currentTimeMillis() - start));

    private void updateStats(String methodName, long elapsedTime) {
        MethodStats stats = methodStats.get(methodName);
        if(stats == null) {
            stats = new MethodStats(methodName);
        stats.totalTime += elapsedTime;
        if(elapsedTime > stats.maxTime) {
            stats.maxTime = elapsedTime;
        if(elapsedTime > methodWarningThreshold) {
            logger.warn("method warning: " + methodName + "(), cnt = " + stats.count + ", lastTime = " + elapsedTime + ", maxTime = " + stats.maxTime);
        if(stats.count % statLogFrequency == 0) {
            long avgTime = stats.totalTime / stats.count;
            long runningAvg = (stats.totalTime-stats.lastTotalTime) / statLogFrequency;
            logger.debug("method: " + methodName + "(), cnt = " + stats.count + ", lastTime = " + elapsedTime + ", avgTime = " + avgTime + ", runningAvg = " + runningAvg + ", maxTime = " + stats.maxTime);
            //reset the last total time
            stats.lastTotalTime = stats.totalTime;   
    class MethodStats {
        public String methodName;
        public long count;
        public long totalTime;
        public long lastTotalTime;
        public long maxTime;
        public MethodStats(String methodName) {
            this.methodName = methodName;

Now, you just need to wire this into your app by referencing this class in your Spring xml and logging config.  When you run your app, you will see stats like this...

WARN  PerfInterceptor - method warning: processRequest(), cnt = 10, lastTime = 1072, maxTime = 1937
TRACE PerfInterceptor - method: processRequest(), cnt = 10, lastTime = 1072, avgTime = 1243, runningAvg = 1243, maxTime = 1937
WARN  PerfInterceptor - method warning: processRequest(), cnt = 20, lastTime = 1466, maxTime = 1937
TRACE PerfInterceptor - method: processRequest(), cnt = 20, lastTime = 1466, avgTime = 1067, runningAvg = 892, maxTime = 1937

As you can see, these stats can provide valuable feedback about class/method performance with very little effort and without modifying any existing Java code.  This information can easily be used to find bottlenecks in your application (generally database or threading related, etc)...good luck

This page has been translated into Spanish language by Maria Ramos from

Friday, April 8, 2011

Camel ActiveMQ Performance Test

Here is a simple unit test (extends CamelTestSupport) to get a feel for how quickly Camel routes add/remove from a JMS queue.  This should give you a ballpark latency estimate (~5ms for my setup).  You can also get some great AMQ performance stats via JMX to monitor an active system.

However, results will vary dramatically depending on thread and AMQ performance/QoS configurations. Refer to the AMQ performance page and the camel-jms page for more information...

private static final Logger logger = Logger.getLogger(AMQRouteTest.class.getName());
@EndpointInject(uri = "mock:mock")
protected MockEndpoint mock;

protected CamelContext createCamelContext() throws Exception {
       CamelContext camelContext = super.createCamelContext();
       String url ="vm://test-broker?broker.persistent=false&broker.useJmx=false";
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       return camelContext;

 public void test() throws Exception {
       int messageCnt = 10000, poolSize = 5;

       ExecutorService executor = Executors.newFixedThreadPool(poolSize);

       for (int i = 0; i < messageCnt; i++) {
           executor.submit(new Callable() {
               public Object call() throws Exception {
                   return null;

 protected RouteBuilder createRouteBuilder() throws Exception {
      return new RouteBuilder() {
             public void configure() throws Exception {
                     .process(new Processor() {
                            long totalLatency, msgCnt;
                            public void process(Exchange exch) throws Exception {
                                   totalLatency += (System.currentTimeMillis() - exch.getIn().getBody(Long.class));
                                   if(++msgCnt % 1000 == 0) {
                                 "avgLatency=" + (totalLatency/msgCnt));

Monday, January 3, 2011

Apache Camel Monitoring

I've seen a lot of discussion about how to monitor Camel based applications.  Most people are looking for the following features: ability to view services (contexts, endpoints, routes), to view performance statistics (route throughput, etc) and to perform basic operations (start/stop routes, send messages, etc).

This post will breakdown the options (that I know of) that are available today (as of Camel 2.8).  If you have used other approaches or know of other ongoing development in this area, please let me know.


Camel uses JMX to provide a standardized way to access metadata about contexts/routes/endpoints defined in a given application.  Also, you can use JMX to interact with these components (start/stop routes, etc) in some interesting ways.

I recently had some very specific Camel/ActiveMQ monitoring requests from a client.  After looking at the options, we ended up building a standalone Tomcat web app that used JSPs, jQuery, Ajax and JMX APIs to view route/endpoint statistics, manage Camel routes (stop, start, etc) and monitor/manipulate ActiveMQ queues.  It provided some much needed visibility and management features for our Camel/ActiveMQ based message processing application...


If you have a handle to the CamelContext, there are various APIs that can help describe and manage routes and endpoints.  These are used by the existing Camel Web Console and can be used to build custom interface to retrieve and use this information in various ways...

here are some of the notable APIs...


With a little creativity, you can use these APIs to manage/monitor and re-wire a Camel application dynamically.

Camel Web Console

This console provides web and REST interfaces to Camel contexts/routes/endpoints and allows you to view/manage endpoints/routes, send messages to endpoints, viewing route statistics, etc.

That being said, using this web console with an existing Camel application is tricky at the moment.  It's currently deployed as a war file that only has access to the CamelContext defined in its embedded spring XML file.  Though the entire camel-web project can be embedded and customized in your application if you desire (and know Scalate).  Given my recent client requirements, I opted to build my own basic app using JSPs/JMX as described above.

There has been some recent support for deploying this console in OSGI, where it should be able to view any CamelContexts deployed in the container, etc.  However, I'm yet to see this work...more on this later.

Using Camel APIs

There are also a number of Camel technologies/patterns that can be used to add monitoring to existing routes.

  • wire tap - can add message logging (to a file or JMS queue/topic, etc) or other inline processing
  • advicewith - can be used to modify existing routes to apply before/after operations or add/remove operations in a route
  • intercept - can be used to intercept Exchanges while they are in route, can apply to all endpoints, certain endpoints or just starting endpoints
  • BrowsableEndpoint - is an interface which Endpoints may implement to support the browsing of the exchanges which are pending or have been sent on it.
That being said, it takes some creativity to use these effectively and caution to not adversely affect the routes you are trying to monitor.

 Hyperic HQ

You can use this tool to monitor Servicemix (or any process), but it more geared towards system monitoring and JVM stats.  I didn't find it useful for any Camel specific monitoring. 

This is a newer tool that has plugins for ActiveMQ, Camel, etc to expose some great information and allow your to perform JMX operations.


these are standard JMX based consoles.  They aren't web based and can't be customized (easily anyways) to provide anything more than a tree-like view of JMX MBeans.  If you know where to look though, you can do a lot with it.


These are just some quick notes at this point.  As I learn about other ways of monitoring Camel, I'll update this list and give some more detailed comparison.  Any comments are welcome...

Sunday, December 19, 2010

basic REST service in Apache CXF vs. Camel-CXF

This article demonstrates how to create/test a basic REST service in CXF vs. Camel-CXF. Given the range of configuration and deployment options, I'm focusing on building a basic OSGi bundle that can be deployed in Fuse 4.2 (ServiceMix)...basic knowledge of Maven, ServiceMix and Camel are assumed.

Apache CXF

For more details, see

Here is an overview of the steps to get a basic example running...

1. add dependencies to your pom.xml


2. setup the bundle-context.xml file

    <import resource="classpath:META-INF/cxf/cxf.xml" />
    <import resource="classpath:META-INF/cxf/cxf-extension-jaxrs-binding.xml" />
    <import resource="classpath:META-INF/cxf/cxf-extension-http.xml" />
    <import resource="classpath:META-INF/cxf/cxf-extension-http-jetty.xml" />

    <bean id="exampleBean" class="com.example.ExampleBean" />

    <jaxrs:server id="exampleService" address="http://localhost:9000/">
            <ref bean="exampleBean" />

3. create a service bean class

public class ExampleBean {

    public String ping() throws Exception {
        return "SUCCESS";

4. deploy and test

  build the bundle using "mvn install"
  start servicemix
  deploy the bundle
  open a browser to "http://localhost:9000/example" (should see "SUCCESS")


For details, see

Here is an overview of the steps to get a basic example running...

1. add dependencies to your pom.xml



2. setup the bundle-context.xml file

    <camelContext trace="true" id="camelContext" xmlns="">

3. create a RouteBuilder class

public class ExampleRouter extends RouteBuilder {

    public void configure() throws Exception {

        from("cxfrs://http://localhost:9000?resourceClasses=" + ExampleResource.class.getName())
            .process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    //custom processing here

4. create a REST Resource class

public class ExampleResource {

    public void ping() {
        //strangely, this method is not called, only serves to configure the endpoint

5.  deploy and test

  build bundle using "mvn install"
  start servicemix
  deploy the bundle
  open a browser to "http://localhost:9000/example" (should see "SUCCESS")

Unit Testing

To perform basic unit testing for either of these approaches, use the Apache HttpClient APIs by first adding this dependency to your pom.xml...


Then, you can use these APIs to create a basic test to validate the REST services created above...

        String url = "http://localhost:9000/example";
        HttpGet httpGet = new HttpGet(url);
        HttpClient httpclient = new DefaultHttpClient();
        HttpResponse response = httpclient.execute(httpGet);
        String responseMessage = EntityUtils.toString(response.getEntity());
        assertEquals("SUCCESS", responseMessage);
        assertEquals(200, response.getStatusLine().getStatusCode());


Overall, the approaches are very similar, but you can use various combinations of Spring XML and Java APIs to set this up.  I focused on a common approach to demonstrate the basics of each approach side-by-side.

That being said, if you have requirements for complex REST services (security, interceptors, filters, etc), I recommend grabbing a copy of Apache CXF Web Service Development and following some of the more complex examples on the Apache CXF, Camel-CXFRS pages.

In practice, I've generally used Camel-CXF because it gives you the flexibility of integrating with other Camel components and allows you to leverage the rich routing features of Camel.  I hope to cover more complex scenarios in future posts...