Index: activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java =================================================================== --- activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java (revision 0) +++ activemq-core/src/test/java/org/apache/activemq/perf/NetworkedSyncTest.java (revision 0) @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.perf; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import junit.framework.TestCase; +import junit.textui.TestRunner; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.xbean.BrokerFactoryBean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.xbean.spring.context.ClassPathXmlApplicationContext; +import org.springframework.core.io.ClassPathResource; + + +public class NetworkedSyncTest extends TestCase { + + // constants + public static final int MESSAGE_COUNT = 10000; //100000; + public final static String config = "org/apache/activemq/perf/networkSync.xml"; + public final static String broker1URL = "tcp://localhost:61616"; + public final static String broker2URL = "tcp://localhost:62616"; + private final String networkConnectorURL = "static://(" + broker2URL + ")"; + private static final Log LOG = LogFactory.getLog(NetworkedSyncTest.class); + BrokerService broker1 = null; + BrokerService broker2 = null; + NetworkConnector connector = null; + + /** + * @param name + */ + public NetworkedSyncTest(String name) { + super(name); + LOG.info("Testcase started."); + } + + public static void main(String args[]) { + TestRunner.run(NetworkedSyncTest.class); + } + + /** + * @throws java.lang.Exception + */ + protected void setUp() throws Exception { + LOG.info("setUp() called."); + ClassPathXmlApplicationContext context1 = null; + BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(config)); + + /* start up first broker instance */ + try { + // resolve broker1 + Thread.currentThread().setContextClassLoader( + NetworkedSyncTest.class.getClassLoader()); + context1 = new ClassPathXmlApplicationContext(config); + broker1 = (BrokerService) context1.getBean("broker1"); + + // start the broker + if (!broker1.isStarted()) { + LOG.info("Broker broker1 not yet started. Kicking it off now."); + broker1.start(); + } else { + LOG.info("Broker broker1 already started. Not kicking it off a second time."); + broker1.waitUntilStopped(); + } + } catch (Exception e) { + LOG.fatal("Error: " + e.getMessage()); + throw e; + // brokerService.stop(); + } + + /* start up second broker instance */ + try { + Thread.currentThread().setContextClassLoader( + NetworkedSyncTest.class.getClassLoader()); + context1 = new ClassPathXmlApplicationContext(config); + broker2 = (BrokerService) context1.getBean("broker2"); + + // start the broker + if (!broker2.isStarted()) { + LOG.info("Broker broker2 not yet started. Kicking it off now."); + broker2.start(); + } else { + LOG.info("Broker broker2 already started. Not kicking it off a second time."); + broker2.waitUntilStopped(); + } + } catch (Exception e) { + LOG.fatal("Error: " + e.getMessage()); + throw e; + } + + // setup network connector from broker1 to broker2 + connector = broker1.addNetworkConnector(networkConnectorURL); + connector.setBrokerName(broker1.getBrokerName()); + connector.setDuplex(true); + connector.start(); + LOG.info("Network connector created."); + } + + /** + * @throws java.lang.Exception + */ + protected void tearDown() throws Exception { + + LOG.info("tearDown() called."); + + if (broker1 != null && broker1.isStarted()) { + LOG.info("Broker1 still running, stopping it now."); + broker1.stop(); + } else { + LOG.info("Broker1 not running, nothing to shutdown."); + } + if (broker2 != null && broker2.isStarted()) { + LOG.info("Broker2 still running, stopping it now."); + broker2.stop(); + } else { + LOG.info("Broker2 not running, nothing to shutdown."); + } + + } + + public void testMessageExchange() throws Exception { + LOG.info("testMessageExchange() called."); + + // create producer and consumer threads + Thread producer = new Thread(new Producer()); + Thread consumer = new Thread(new Consumer()); + // start threads + consumer.start(); + Thread.sleep(2000); + producer.start(); + + + // wait for threads to finish + producer.join(); + consumer.join(); + } +} + +/** + * Message producer running as a separate thread, connecting to broker1 + * + * @author tmielke + * + */ +class Producer implements Runnable { + + private static final Log LOG = LogFactory.getLog(Producer.class); + + /** + * connect to broker and constantly send messages + */ + public void run() { + + Connection connection = null; + Session session = null; + MessageProducer producer = null; + + try { + ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory( + NetworkedSyncTest.broker1URL); + connection = amq.createConnection(); + + connection.setExceptionListener(new javax.jms.ExceptionListener() { + public void onException(javax.jms.JMSException e) { + e.printStackTrace(); + } + }); + + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic destination = session.createTopic("TEST.FOO"); + + producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + long counter = 0; + + // Create and send message + for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) { + + String text = "Hello world! From: " + + Thread.currentThread().getName() + " : " + + this.hashCode() + ":" + counter; + TextMessage message = session.createTextMessage(text); + producer.send(message); + counter++; + + if ((counter % 1000) == 0) + LOG.info("sent " + counter + " messages"); + + } + } catch (Exception ex) { + LOG.error(ex); + return; + } finally { + try { + if (producer != null) + producer.close(); + if (session != null) + session.close(); + if (connection != null) + connection.close(); + } catch (Exception e) { + LOG.error("Problem closing down JMS objects: " + e); + } + } + } +} + +/* + * * Message consumer running as a separate thread, connecting to broker2 + * @author tmielke + * + */ +class Consumer implements Runnable { + + private static final Log LOG = LogFactory.getLog(Consumer.class);; + + + /** + * connect to broker and receive messages + */ + public void run() { + Connection connection = null; + Session session = null; + MessageConsumer consumer = null; + + try { + ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory( + NetworkedSyncTest.broker2URL); + connection = amq.createConnection(); + // need to set clientID when using durable subscription. + connection.setClientID("tmielke"); + + connection.setExceptionListener(new javax.jms.ExceptionListener() { + public void onException(javax.jms.JMSException e) { + e.printStackTrace(); + } + }); + + connection.start(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic("TEST.FOO"); + consumer = session.createDurableSubscriber((Topic) destination,"tmielke"); + + long counter = 0; + // Wait for a message + for (int i = 0; i < NetworkedSyncTest.MESSAGE_COUNT; i++) { + Message message2 = consumer.receive(); + if (message2 instanceof TextMessage) { + TextMessage textMessage = (TextMessage) message2; + String text = textMessage.getText(); + // logger.info("Received: " + text); + } else { + LOG.error("Received message of unsupported type. Expecting TextMessage. "+ message2); + } + counter++; + if ((counter % 1000) == 0) + LOG.info("received " + counter + " messages"); + + + } + } catch (Exception e) { + LOG.error("Error in Consumer: " + e); + return; + } finally { + try { + if (consumer != null) + consumer.close(); + if (session != null) + session.close(); + if (connection != null) + connection.close(); + } catch (Exception ex) { + LOG.error("Error closing down JMS objects: " + ex); + } + } + } +} Index: activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java =================================================================== --- activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java (revision 0) +++ activemq-core/src/test/java/org/apache/activemq/kaha/impl/async/DataFileAppenderTest.java (revision 0) @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.kaha.impl.async; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import junit.framework.TestCase; + +import org.apache.activemq.util.ByteSequence; + +public class DataFileAppenderTest extends TestCase { + AsyncDataManager dataManager; + File dir; + + @Override + public void setUp() throws Exception { + dir = new File("target/tests/DataFileAppenderTest"); + dir.mkdirs(); + dataManager = new AsyncDataManager(); + dataManager.setDirectory(dir); + configure(dataManager); + dataManager.start(); + } + + protected void configure(AsyncDataManager dataManager) { + dataManager.setUseNio(false); + } + + @Override + public void tearDown() throws Exception { + dataManager.close(); + deleteFilesInDirectory(dir); + dir.delete(); + } + + private void deleteFilesInDirectory(File directory) { + File[] files = directory.listFiles(); + for (int i=0; i= iterations); + Thread.sleep(1000); + assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty()); + } + + + public void testBatchWriteCallbackCompleteAfterTimeout() throws Exception { + final int iterations = 10; + final CountDownLatch latch = new CountDownLatch(iterations); + ByteSequence data = new ByteSequence("DATA".getBytes()); + for (int i=0; i= iterations); + assertEquals("none written", iterations, latch.getCount()); + Thread.sleep(1000); + assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty()); + assertEquals("none written", 0, latch.getCount()); + } + + public void testBatchWriteCallbackCompleteAfterClose() throws Exception { + final int iterations = 10; + final CountDownLatch latch = new CountDownLatch(iterations); + ByteSequence data = new ByteSequence("DATA".getBytes()); + for (int i=0; i= iterations); + assertEquals("none written", iterations, latch.getCount()); + dataManager.close(); + assertTrue("queued data is written", dataManager.getInflightWrites().isEmpty()); + assertEquals("none written", 0, latch.getCount()); + } + + public void testBatchWriteCompleteAfterClose() throws Exception { + ByteSequence data = new ByteSequence("DATA".getBytes()); + final int iterations = 10; + for (int i=0; i= iterations); + dataManager.close(); + assertTrue("queued data is written:" + dataManager.getInflightWrites().size(), dataManager.getInflightWrites().isEmpty()); + } + + public void testBatchWriteToMaxMessageSize() throws Exception { + final int iterations = 4; + final CountDownLatch latch = new CountDownLatch(iterations); + Runnable done = new Runnable() { + public void run() { + latch.countDown(); + } + }; + int messageSize = DataFileAppender.DEFAULT_MAX_BATCH_SIZE / iterations; + byte[] message = new byte[messageSize]; + ByteSequence data = new ByteSequence(message); + + for (int i=0; i< iterations - 1; i++) { + dataManager.write(data, done); + } + assertEquals("all writes are queued", iterations, latch.getCount()); + dataManager.write(data, done); + latch.await(10, TimeUnit.SECONDS); // write may take some time + assertEquals("all callbacks complete", 0, latch.getCount()); + } + + public void testNoBatchWriteWithSync() throws Exception { + ByteSequence data = new ByteSequence("DATA".getBytes()); + final int iterations = 10; + for (int i=0; i + + + + + + + + + + + + + + + + + + + + + + + + + + + Index: activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java =================================================================== --- activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (revision 651544) +++ activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (working copy) @@ -30,6 +30,8 @@ * An optimized writer to do batch appends to a data file. This object is thread * safe and gains throughput as you increase the number of concurrent writes it * does. + * In the absence of concurrency, writes are batched till full or till a sync is + * requested or on the expiry of DEFAULT_MAX_WRITE_DELAY * * @version $Revision: 1.1.1.1 $ */ @@ -37,7 +39,8 @@ protected static final byte[] RESERVED_SPACE = new byte[AsyncDataManager.ITEM_HEAD_RESERVED_SPACE]; protected static final String SHUTDOWN_COMMAND = "SHUTDOWN"; - + protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4; + protected static final int DEFAULT_MAX_WRITE_DELAY = 100; protected final AsyncDataManager dataManager; protected final Map inflightWrites; protected final Object enqueueMutex = new Object(){}; @@ -46,8 +49,8 @@ protected boolean shutdown; protected IOException firstAsyncException; protected final CountDownLatch shutdownDone = new CountDownLatch(1); - protected int maxWriteBatchSize = 1024 * 1024 * 4; - + protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE; + protected int maxWriteDelayMillis = DEFAULT_MAX_WRITE_DELAY; private boolean running; private Thread thread; @@ -82,18 +85,22 @@ public final WriteCommand first; public final CountDownLatch latch = new CountDownLatch(1); public int size; + public boolean needsSync; public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException { this.dataFile = dataFile; this.first = write; size += write.location.getSize(); + needsSync |= write.sync; } public boolean canAppend(DataFile dataFile, WriteCommand write) { if (dataFile != this.dataFile) { + needsSync = true; return false; } if (size + write.location.getSize() >= maxWriteBatchSize) { + needsSync = true; return false; } return true; @@ -102,6 +109,7 @@ public void append(WriteCommand write) throws IOException { this.first.getTailNode().linkAfter(write); size += write.location.getSize(); + needsSync |= write.sync; } } @@ -245,7 +253,11 @@ if (nextWriteBatch.canAppend(dataFile, write)) { nextWriteBatch.append(write); rc = nextWriteBatch; + if (write.sync) { + enqueueMutex.notifyAll(); + } } else { + enqueueMutex.notifyAll(); // Otherwise wait for the queuedCommand to be null try { while (nextWriteBatch != null) { @@ -273,6 +285,7 @@ if (!shutdown) { shutdown = true; if (running) { + shouldWrite(nextWriteBatch); enqueueMutex.notifyAll(); } else { shutdownDone.countDown(); @@ -311,16 +324,16 @@ // Block till we get a command. synchronized (enqueueMutex) { while (true) { + if (shouldWrite(nextWriteBatch)) { + o = nextWriteBatch; + nextWriteBatch = null; + break; + } if (shutdown) { o = SHUTDOWN_COMMAND; break; } - if (nextWriteBatch != null) { - o = nextWriteBatch; - nextWriteBatch = null; - break; - } - enqueueMutex.wait(); + enqueueMutex.wait(maxWriteDelayMillis); } enqueueMutex.notify(); } @@ -421,5 +434,14 @@ } } + protected final boolean shouldWrite(WriteBatch batch) { + boolean ret = false; + if (batch != null) { + ret = batch.needsSync; + batch.needsSync = true; + } + return ret; + } + } Index: activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java =================================================================== --- activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (revision 651544) +++ activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (working copy) @@ -68,16 +68,16 @@ // Block till we get a command. synchronized (enqueueMutex) { while (true) { + if (shouldWrite(nextWriteBatch)) { + o = nextWriteBatch; + nextWriteBatch = null; + break; + } if (shutdown) { o = SHUTDOWN_COMMAND; break; } - if (nextWriteBatch != null) { - o = nextWriteBatch; - nextWriteBatch = null; - break; - } - enqueueMutex.wait(); + enqueueMutex.wait(maxWriteDelayMillis); } enqueueMutex.notify(); }