Tuesday, November 8, 2011

Creating A Custom Camel Component

While Camel supports an ever growing number of components, you might have a need to create a custom component. This could be to either promote reuse across projects, customize an existing component or provide a simplified interface to an existing system. Whatever the reason, here is an overview of the options that are available within the Camel framework...

first, consider just creating a Bean or Processor

Before you jump in and create a component, consider just creating a simple class to handle your custom logic. Behind the scenes, all components are just Processors with a bunch of lifecycle support around them.  Beans and Processors are simple, streamlined and easy to manage.

using a Bean...

from(uri).bean(MyBean.class);
... 
public class MyBean {
    public void doSomething(Exchange exchange) {
      //do something...
   }
}

using a Processor...

from(uri).process(new MyProcessor());
... 
public class MyProcessor implements Processor {
    public void process(Exchange exchange) throws Exception {
        //do something... 
    }
}

create a custom component

If you decide to go down this route, you should start by start by using a Maven archetype to stub out a new component project for you.

mvn archetype:generate
    -DarchetypeGroupId=org.apache.camel.archetypes
    -DarchetypeArtifactId=camel-archetype-component
    -DarchetypeVersion=2.9.2
    -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots-group
    -DgroupId=org.apache.camel.component
    -DartifactId=camel-ben
 
This will create a new Maven component project that contains an example HelloWorld component as seen here...
 
 
 





The following core classes are created and have the following responsibilities:

  • HelloWorldComponent
    • endpoint factory which implements createEndpoint()
  • HelloWorldEndpoint
    • producer/consumer factory which implements createConsumer(), createProducer(), createExchange()
  • HelloWorldConsumer
    • acts as a service to consumes request at the start of a route
  • HelloWorldProducer
    • acts as a service consumer to dispatch outgoing requests and receive incoming replies
  • Exchange
    • encapsulate the in/out message payloads and meta data about the data flowing between endpoints
  • Message
    • represent the message payload
    • their is an IN and OUT message for each exchange 
So, how do all these classes/method actually work?  The best way to get your head around this is to load the project into Eclipse (or IntelliJ) and debug the unit test.  This will allow you to step into the route initialization and message processing to trace the flow.

Consumer Lifecycle

When you define a route that uses your new component as a consumer, like this
from("helloworld:foo").to("log:result");

It does the following:
  • creates a HelloWorldComponent instance (one per CamelContext)
  • calls HelloWorldComponent createEndpoint() with the given URI
  • creates a HelloWorldEndpoint instance (one per route reference)
  • creates a HelloWorldConsumer instance (one per route reference)
  • register the route with the CamelContext and call doStart() on the Consumer
  • consumers will then start in one of the following modes:
    • event driven - wait for message to trigger route
    • polling consumer - manually polls a resource for events
    • scheduled polling consumer - events automatically generated by timer
    • custom threading - custom management of the event lifecyle

Producer Lifecycle

When you define a route that uses your new component as a producer, like this
from("direct:start").to("helloworld:foo");
It does the following:
  • creates a HelloWorldComponent instance (one per CamelContext)
  • calls HelloWorldComponent createEndpoint() with the given URI
  • creates a HelloWorldEndpoint instance (one per route reference)
  • creates a HelloWorldProducer instance (one per route reference)
  • register the route with the CamelContext and start the route consumer
  • the Producer's process(Exchange) method is then executed
    • generally, this will decorate the Exchange by interfacing with some external resource (file, jms, database, etc)
Other Resources 
http://camel.apache.org/writing-components.html
http://fusesource.com/docs/mirrors/camel/developers/writing-components.html
http://fusesource.com/docs/mirrors/camel/documentation/user-guide/creating-a-new-camel-component.html
 

Friday, August 19, 2011

Performance Monitoring With Spring AOP

If you are using Spring to access/configure resources (DAOs/services), then you might as well add some basic performance monitoring while you are at it.  This is a trivial task with Spring AOP and doesn't require any changes to existing code, just some simple configuration. 

First, you need to include the spring-aop, aspectj and cglib libraries.  If you are using Maven, simply include the following dependencies...

        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.5.4</version>
        </dependency>
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib-nodep</artifactId>
            <version>2.2</version>
        </dependency>
        <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-aop</artifactId>
          <version>2.5.6</version>
        </dependency>


Next, identify what needs monitoring and put the AOP hooks in place.  Generally, this just requires adding a pointcut and advisor configuration in your existing Spring XML configuration file.  This configuration will add method response time logging to all methods in the "com.mycompany.services" package.  Note: these classes must be instantiated with the Spring context...otherwise, the AOP hooks will not be executed.

    <bean id="performanceMonitor"
              class="org.springframework.aop.interceptor.PerformanceMonitorInterceptor" />

    <aop:config>
        <aop:pointcut id="allServiceMethods" expression="execution(* com.mycompany.services.*.*(..))"/>
        <aop:advisor pointcut-ref="allServiceMethods" advice-ref="performanceMonitor" order="2"/>
    </aop:config>

Next, you need to setup your logging (log4j, etc) to enable TRACE on the interceptor class.

    <logger name="org.springframework.aop.interceptor.PerformanceMonitorInterceptor" additivity="false">
        <level value="TRACE"/>
        <appender-ref ref="STDOUT"/>
    </logger>

That's it, now when you run your application, you will see the following logging...

TRACE PerformanceMonitorInterceptor  - StopWatch 'PerfTestService.processRequest': running time (millis) = 1322
TRACE PerformanceMonitorInterceptor  - StopWatch 'PerfTestService.processRequest': running time (millis) = 98
TRACE PerformanceMonitorInterceptor  - StopWatch 'PerfTestService.processRequest': running time (millis) = 1764


This is a some great raw data, but unfortunately is not very useful on its own.  Its for every method call and doesn't provide any other stats.  This quickly clutters up the log and without some way to process/aggregate the log entries, its hard to make sense out of it.  So, unless you plan of writing some log parsing or using 3rd party software (like Splunk or Cati), then you really should do some processing of the data before writing it to the log file.


One easy way to do this is to just write a simple interceptor class to use instead of the Spring default one (PerformanceMonitorInterceptor).  Below is an example of this that provides periodic stats (last, average and greatest response time) as well as warning whenever a method response time exceeds a configured threshold.

By default, it will log stats every 10 method calls and log a warning message anytime a method response time exceeds 1000ms.

public class PerfInterceptor implements MethodInterceptor {

     Logger logger = LoggerFactory.getLogger(PerfInterceptor.class.getName());
    private static ConcurrentHashMap<String, MethodStats> methodStats = new ConcurrentHashMap<String, MethodStats>();
    private static long statLogFrequency = 10;
    private static long methodWarningThreshold = 1000;
   
    public Object invoke(MethodInvocation method) throws Throwable {
        long start = System.currentTimeMillis();
        try {
            return method.proceed();
        }
        finally {
            updateStats(method.getMethod().getName(),(System.currentTimeMillis() - start));
        }
    }

    private void updateStats(String methodName, long elapsedTime) {
        MethodStats stats = methodStats.get(methodName);
        if(stats == null) {
            stats = new MethodStats(methodName);
            methodStats.put(methodName,stats);
        }
        stats.count++;
        stats.totalTime += elapsedTime;
        if(elapsedTime > stats.maxTime) {
            stats.maxTime = elapsedTime;
        }
       
        if(elapsedTime > methodWarningThreshold) {
            logger.warn("method warning: " + methodName + "(), cnt = " + stats.count + ", lastTime = " + elapsedTime + ", maxTime = " + stats.maxTime);
        }
       
        if(stats.count % statLogFrequency == 0) {
            long avgTime = stats.totalTime / stats.count;
            long runningAvg = (stats.totalTime-stats.lastTotalTime) / statLogFrequency;
            logger.debug("method: " + methodName + "(), cnt = " + stats.count + ", lastTime = " + elapsedTime + ", avgTime = " + avgTime + ", runningAvg = " + runningAvg + ", maxTime = " + stats.maxTime);
           
            //reset the last total time
            stats.lastTotalTime = stats.totalTime;   
        }
    }
   
    class MethodStats {
        public String methodName;
        public long count;
        public long totalTime;
        public long lastTotalTime;
        public long maxTime;
       
        public MethodStats(String methodName) {
            this.methodName = methodName;
        }
    } 
}


Now, you just need to wire this into your app by referencing this class in your Spring xml and logging config.  When you run your app, you will see stats like this...

WARN  PerfInterceptor - method warning: processRequest(), cnt = 10, lastTime = 1072, maxTime = 1937
TRACE PerfInterceptor - method: processRequest(), cnt = 10, lastTime = 1072, avgTime = 1243, runningAvg = 1243, maxTime = 1937
WARN  PerfInterceptor - method warning: processRequest(), cnt = 20, lastTime = 1466, maxTime = 1937
TRACE PerfInterceptor - method: processRequest(), cnt = 20, lastTime = 1466, avgTime = 1067, runningAvg = 892, maxTime = 1937

As you can see, these stats can provide valuable feedback about class/method performance with very little effort and without modifying any existing Java code.  This information can easily be used to find bottlenecks in your application (generally database or threading related, etc)...good luck

This page has been translated into Spanish language by Maria Ramos from Webhostinghub.com

Friday, April 8, 2011

Camel ActiveMQ Performance Test

Here is a simple unit test (extends CamelTestSupport) to get a feel for how quickly Camel routes add/remove from a JMS queue.  This should give you a ballpark latency estimate (~5ms for my setup).  You can also get some great AMQ performance stats via JMX to monitor an active system.

However, results will vary dramatically depending on thread and AMQ performance/QoS configurations. Refer to the AMQ performance page and the camel-jms page for more information...

private static final Logger logger = Logger.getLogger(AMQRouteTest.class.getName());
@EndpointInject(uri = "mock:mock")
protected MockEndpoint mock;

protected CamelContext createCamelContext() throws Exception {
       CamelContext camelContext = super.createCamelContext();
       String url ="vm://test-broker?broker.persistent=false&broker.useJmx=false";
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       camelContext.addComponent("activemq", 
                                                     JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
       return camelContext;
   }

@Test
 public void test() throws Exception {
       int messageCnt = 10000, poolSize = 5;
       mock.setMinimumExpectedMessageCount(messageCnt);

       ExecutorService executor = Executors.newFixedThreadPool(poolSize);

       for (int i = 0; i < messageCnt; i++) {
           executor.submit(new Callable() {
               public Object call() throws Exception {
                   template.sendBody("activemq:queue:test",System.currentTimeMillis());
                   return null;
               }
           });
       }
       mock.assertIsSatisfied();
}

@Override
 protected RouteBuilder createRouteBuilder() throws Exception {
      return new RouteBuilder() {
            @Override
             public void configure() throws Exception {
                     from("activemq:queue:test?concurrentConsumers=10")
                     .process(new Processor() {
                            long totalLatency, msgCnt;
                            public void process(Exchange exch) throws Exception {
                                   totalLatency += (System.currentTimeMillis() - exch.getIn().getBody(Long.class));
                                   if(++msgCnt % 1000 == 0) {
                                           logger.info("avgLatency=" + (totalLatency/msgCnt));
                                  }
                           }
                     })
                     .to("mock:mock");
          }
  };
}


Monday, January 3, 2011

Apache Camel Monitoring

I've seen a lot of discussion about how to monitor Camel based applications.  Most people are looking for the following features: ability to view services (contexts, endpoints, routes), to view performance statistics (route throughput, etc) and to perform basic operations (start/stop routes, send messages, etc).

This post will breakdown the options (that I know of) that are available today (as of Camel 2.8).  If you have used other approaches or know of other ongoing development in this area, please let me know.

JMX APIs

Camel uses JMX to provide a standardized way to access metadata about contexts/routes/endpoints defined in a given application.  Also, you can use JMX to interact with these components (start/stop routes, etc) in some interesting ways.

I recently had some very specific Camel/ActiveMQ monitoring requests from a client.  After looking at the options, we ended up building a standalone Tomcat web app that used JSPs, jQuery, Ajax and JMX APIs to view route/endpoint statistics, manage Camel routes (stop, start, etc) and monitor/manipulate ActiveMQ queues.  It provided some much needed visibility and management features for our Camel/ActiveMQ based message processing application...

CamelContext

If you have a handle to the CamelContext, there are various APIs that can help describe and manage routes and endpoints.  These are used by the existing Camel Web Console and can be used to build custom interface to retrieve and use this information in various ways...

here are some of the notable APIs...

getRouteDefinitions()
getEndpoints()
getEndpointsMap()
getRouteStatus(routeId)
startRoute(routeId)
stopRoute(routeId)
removeRoute(routeId)
addRoutes(routeBuilder)
suspendRoute(routeId)
resumeRoute(routeId)

With a little creativity, you can use these APIs to manage/monitor and re-wire a Camel application dynamically.

Camel Web Console

This console provides web and REST interfaces to Camel contexts/routes/endpoints and allows you to view/manage endpoints/routes, send messages to endpoints, viewing route statistics, etc.

That being said, using this web console with an existing Camel application is tricky at the moment.  It's currently deployed as a war file that only has access to the CamelContext defined in its embedded spring XML file.  Though the entire camel-web project can be embedded and customized in your application if you desire (and know Scalate).  Given my recent client requirements, I opted to build my own basic app using JSPs/JMX as described above.

There has been some recent support for deploying this console in OSGI, where it should be able to view any CamelContexts deployed in the container, etc.  However, I'm yet to see this work...more on this later.

Using Camel APIs

There are also a number of Camel technologies/patterns that can be used to add monitoring to existing routes.

  • wire tap - can add message logging (to a file or JMS queue/topic, etc) or other inline processing
  • advicewith - can be used to modify existing routes to apply before/after operations or add/remove operations in a route
  • intercept - can be used to intercept Exchanges while they are in route, can apply to all endpoints, certain endpoints or just starting endpoints
  • BrowsableEndpoint - is an interface which Endpoints may implement to support the browsing of the exchanges which are pending or have been sent on it.
That being said, it takes some creativity to use these effectively and caution to not adversely affect the routes you are trying to monitor.

 Hyperic HQ

You can use this tool to monitor Servicemix (or any process), but it more geared towards system monitoring and JVM stats.  I didn't find it useful for any Camel specific monitoring. 

HAWTIO - http://hawt.io/
This is a newer tool that has plugins for ActiveMQ, Camel, etc to expose some great information and allow your to perform JMX operations.

jConsole/VisualVM

these are standard JMX based consoles.  They aren't web based and can't be customized (easily anyways) to provide anything more than a tree-like view of JMX MBeans.  If you know where to look though, you can do a lot with it.

summary

These are just some quick notes at this point.  As I learn about other ways of monitoring Camel, I'll update this list and give some more detailed comparison.  Any comments are welcome...