Friday, April 8, 2011

Camel ActiveMQ Performance Test

Here is a simple unit test (extends CamelTestSupport) to get a feel for how quickly Camel routes add/remove from a JMS queue.  This should give you a ballpark latency estimate (~5ms for my setup).  You can also get some great AMQ performance stats via JMX to monitor an active system.

However, results will vary dramatically depending on thread and AMQ performance/QoS configurations. Refer to the AMQ performance page and the camel-jms page for more information...

private static final Logger logger = Logger.getLogger(AMQRouteTest.class.getName());
@EndpointInject(uri = "mock:mock")
protected MockEndpoint mock;

protected CamelContext createCamelContext() throws Exception {
       CamelContext camelContext = super.createCamelContext();
       String url ="vm://test-broker?broker.persistent=false&broker.useJmx=false";
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       camelContext.addComponent("activemq", 
                                                     JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
       return camelContext;
   }

@Test
 public void test() throws Exception {
       int messageCnt = 10000, poolSize = 5;
       mock.setMinimumExpectedMessageCount(messageCnt);

       ExecutorService executor = Executors.newFixedThreadPool(poolSize);

       for (int i = 0; i < messageCnt; i++) {
           executor.submit(new Callable() {
               public Object call() throws Exception {
                   template.sendBody("activemq:queue:test",System.currentTimeMillis());
                   return null;
               }
           });
       }
       mock.assertIsSatisfied();
}

@Override
 protected RouteBuilder createRouteBuilder() throws Exception {
      return new RouteBuilder() {
            @Override
             public void configure() throws Exception {
                     from("activemq:queue:test?concurrentConsumers=10")
                     .process(new Processor() {
                            long totalLatency, msgCnt;
                            public void process(Exchange exch) throws Exception {
                                   totalLatency += (System.currentTimeMillis() - exch.getIn().getBody(Long.class));
                                   if(++msgCnt % 1000 == 0) {
                                           logger.info("avgLatency=" + (totalLatency/msgCnt));
                                  }
                           }
                     })
                     .to("mock:mock");
          }
  };
}