Showing posts with label activemq. Show all posts
Showing posts with label activemq. Show all posts

Friday, April 8, 2011

Camel ActiveMQ Performance Test

Here is a simple unit test (extends CamelTestSupport) to get a feel for how quickly Camel routes add/remove from a JMS queue.  This should give you a ballpark latency estimate (~5ms for my setup).  You can also get some great AMQ performance stats via JMX to monitor an active system.

However, results will vary dramatically depending on thread and AMQ performance/QoS configurations. Refer to the AMQ performance page and the camel-jms page for more information...

private static final Logger logger = Logger.getLogger(AMQRouteTest.class.getName());
@EndpointInject(uri = "mock:mock")
protected MockEndpoint mock;

protected CamelContext createCamelContext() throws Exception {
       CamelContext camelContext = super.createCamelContext();
       String url ="vm://test-broker?broker.persistent=false&broker.useJmx=false";
       ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
       camelContext.addComponent("activemq", 
                                                     JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
       return camelContext;
   }

@Test
 public void test() throws Exception {
       int messageCnt = 10000, poolSize = 5;
       mock.setMinimumExpectedMessageCount(messageCnt);

       ExecutorService executor = Executors.newFixedThreadPool(poolSize);

       for (int i = 0; i < messageCnt; i++) {
           executor.submit(new Callable() {
               public Object call() throws Exception {
                   template.sendBody("activemq:queue:test",System.currentTimeMillis());
                   return null;
               }
           });
       }
       mock.assertIsSatisfied();
}

@Override
 protected RouteBuilder createRouteBuilder() throws Exception {
      return new RouteBuilder() {
            @Override
             public void configure() throws Exception {
                     from("activemq:queue:test?concurrentConsumers=10")
                     .process(new Processor() {
                            long totalLatency, msgCnt;
                            public void process(Exchange exch) throws Exception {
                                   totalLatency += (System.currentTimeMillis() - exch.getIn().getBody(Long.class));
                                   if(++msgCnt % 1000 == 0) {
                                           logger.info("avgLatency=" + (totalLatency/msgCnt));
                                  }
                           }
                     })
                     .to("mock:mock");
          }
  };
}


Tuesday, August 17, 2010

managing ActiveMQ with JMX APIs

here is a quick example of how to programmatically access ActiveMQ MBeans to monitor and manipulate message queues...

first, get a connection to a JMX server (assumes localhost, port 1099, no auth)
note, always cache the connection for subsequent requests (can cause memory utilization issues otherwise)

JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
JMXConnector jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection conn = jmxc.getMBeanServerConnection();

then, you can execute various operations such as addQueue, removeQueue, etc...

String operationName="addQueue";
String parameter="MyNewQueue";
ObjectName activeMQ = new ObjectName("org.apache.activemq:BrokerName=localhost,Type=Broker");
if(parameter != null) {
    Object[] params = {parameter};
    String[] sig = {"java.lang.String"};
    conn.invoke(activeMQ, operationName, params, sig);
} else {
    conn.invoke(activeMQ, operationName,null,null);
}

also, you can get an ActiveMQ QueueViewMBean instance for a specified queue name...

ObjectName activeMQ = new ObjectName("org.apache.activemq:BrokerName=localhost,Type=Broker");
BrokerViewMBean mbean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, activeMQ,BrokerViewMBean.class, true);

for (ObjectName name : mbean.getQueues()) {
    QueueViewMBean queueMbean = (QueueViewMBean)
           MBeanServerInvocationHandler.newProxyInstance(mbsc, name, QueueViewMBean.class, true);

    if (queueMbean.getName().equals(queueName)) {
        queueViewBeanCache.put(cacheKey, queueMbean);
        return queueMbean;
    }
}

then, execute one of several APIs against the QueueViewMBean instance...

queue monitoring - getEnqueueCount(), getDequeueCount(), getConsumerCount(), etc...

queue manipulation - purge(), getMessage(String messageId), removeMessage(String messageId), moveMessageTo(String messageId, String destinationName), copyMessageTo(String messageId, String destinationName), etc...

summary
The APIs can easily be used to build a web or command line based tool to support remote ActiveMQ management features. That being said, all of these features are available via the JMX console itself and ActiveMQ does provide a web console to support some management/monitoring tasks.

See these pages for more information...

http://activemq.apache.org/jmx-support.html
http://activemq.apache.org/web-console.html