package com.citrix.perf.jms.commsvc; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.pool.PooledConnectionFactory; import javax.jms.Connection; import javax.jms.Session; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.DeliveryMode; import javax.jms.Message; import java.io.File; public class PersistentStorageCleanup { public static void printStats(BrokerService broker) throws Exception { DestinationStatistics[] stats = new DestinationStatistics[2]; org.apache.activemq.broker.region.Destination consumedQueue = broker.getDestination(new ActiveMQQueue("consume.queue")); org.apache.activemq.broker.region.Destination unconsumedQueue = broker.getDestination(new ActiveMQQueue("no.consume.queue")); if(consumedQueue != null) { stats[0] = consumedQueue.getDestinationStatistics(); System.out.println("Consumed queue size : " + stats[0].getMessages().getCount()); System.out.println("Consumed queue dequeues : " + stats[0].getDequeues().getCount()); } if(unconsumedQueue != null) { stats[1] = unconsumedQueue.getDestinationStatistics(); System.out.println("Unconsumed queue size : " + stats[1].getMessages().getCount()); } System.out.println("Store usage : " + broker.getSystemUsage().getStoreUsage()); System.out.println(); } public static void main(String[] args) throws Exception { BrokerService broker = new BrokerService(); broker.setPersistent(true); broker.setBrokerName("brokerOne"); broker.setUseJmx(true); ManagementContext context = new ManagementContext(); context.setRmiServerPort(1699); context.setConnectorPort(1699); broker.setManagementContext(context); broker.addConnector("tcp://localhost:61616"); broker.start(); String dataDirectory = args[0]; final ActivemqConsumer consumer = new ActivemqConsumer("consume.queue"); new Thread(new Runnable() { public void run() { while(true) { try { consumer.consume(); } catch(Exception e) { e.printStackTrace(); } } } }).start(); System.out.println("Started normal consumer"); final ActivemqProducer producerWithConsumer = new ActivemqProducer("consume.queue"); new Thread(new Runnable() { public void run() { while(true) { try { producerWithConsumer.produce(200000); try { Thread.sleep(100); } catch(InterruptedException e) { } } catch(Exception e) { e.printStackTrace(); } } } }).start(); System.out.println("Started normal producer"); int prevLength = 1; //Wait for persistence directory to be created while(!new File(dataDirectory).exists()) { try { Thread.sleep(1000); } catch(InterruptedException e) { } } //Produce messages to queue without consumer whenever a new log file is created int id = 0; final ActivemqProducer producerWithoutConsumer = new ActivemqProducer("no.consume.queue"); while(true) { System.out.println("Num data files : " + (new File(dataDirectory).list().length - 1)); if(new File(dataDirectory).list().length > prevLength) { producerWithoutConsumer.produce(1000); prevLength = new File(dataDirectory).list().length; System.out.println("Produced message to queue without consumer : " + id++); printStats(broker); } try { Thread.sleep(5000); } catch(InterruptedException e) { } } } } class ActivemqConsumer { private Connection connection; private Session session; private MessageConsumer consumer; public ActivemqConsumer(String destinationName) throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://brokerOne"); final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory); connection = pooledConnectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationName); consumer = session.createConsumer(destination); } public void consume() throws JMSException { consumer.receive(); } public void stop() throws JMSException { consumer.close(); session.close(); connection.close(); } } class ActivemqProducer { private Connection connection; private Session session; private MessageProducer producer; ActivemqProducer(String destinationName) throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://brokerOne"); final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory); connection = pooledConnectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationName); producer = session.createProducer(destination); } public void produce(int length) throws JMSException { BytesMessage message = session.createBytesMessage(); message.writeBytes(new byte[length]); producer.send(message, DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); } public void stop() throws JMSException { producer.close(); session.close(); connection.close(); } }