Issue Details (XML | Word | Printable)

Key: AMQ-2139
Type: Improvement Improvement
Status: Open Open
Priority: Major Major
Assignee: Hiram Chirino
Reporter: Hiram Chirino
Votes: 0
Watchers: 1
Operations

If you were logged in you would be able to see more operations.
ActiveMQ

Batch up multiple socket write calls in the TCP transport.

Created: 23/Feb/09 08:26 AM   Updated: 15/Jun/09 09:47 AM
Return to search
Component/s: Transport
Affects Version/s: 5.2.0
Fix Version/s: 6.0.0

Time Tracking:
Not Specified


 Description  « Hide
Investigate using an async write thread for the TCP transport. It would be able to more efficiently batch up multiple writes into a single socket write.

Bellow is a patch that should be investigated. It should increase write performance of the TCP transport:

$ svn diff
Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(revision 742546)
+++ activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(working copy)
@@ -29,7 +29,9 @@
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -119,6 +121,9 @@
     private Boolean tcpNoDelay;
     private Thread runnerThread;
 
+    private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
+    private Thread onewayThread;
+
     /**
      * Connect to a remote Node - e.g. a Broker
      * 
@@ -157,16 +162,39 @@
         this.localLocation = null;
         setDaemon(true);
     }
-
+    
     /**
      * A one way asynchronous send
      */
     public void oneway(Object command) throws IOException {
         checkStarted();
-        wireFormat.marshal(command, dataOut);
-        dataOut.flush();
+        try {
+            outbound.put(command);
+        } catch (InterruptedException e) {
+            throw new InterruptedIOException();
+        }
     }
 
+    protected void sendOneways() {
+        try {
+            while(!isStopped()) {
+                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
+                if( command!=null ) {
+                    try {
+                        while( command!=null ) {
+                            wireFormat.marshal(command, dataOut);
+                            command = outbound.poll();
+                        }
+                        dataOut.flush();
+                    } catch (IOException e) {
+                        getTransportListener().onException(e);
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+        }
+    }
+
     /**
      * @return pretty print of 'this'
      */
@@ -399,6 +427,11 @@
 
     protected void doStart() throws Exception {
         connect();
+        onewayThread = new Thread(null, new Runnable(){
+            public void run() {
+                sendOneways();
+            }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
+        onewayThread.start();
         stoppedLatch.set(new CountDownLatch(1));
         super.doStart();
     }
@@ -487,8 +520,12 @@
                     LOG.debug("Caught exception closing socket",e);
                 }
             }
-           
         }
+        if( onewayThread!=null ) {
+            onewayThread.join();
+            onewayThread = null;
+            outbound.clear();
+        }
     }
 
     /**


 All   Comments   Work Log   Change History   Subversion Commits   FishEye   Crucible      Sort Order: Ascending order - Click to sort in descending order
There are no subversion log entries for this issue yet.