ActiveMQ
  1. ActiveMQ
  2. AMQ-2139

Batch up multiple socket write calls in the TCP transport.

    Details

    • Type: Improvement Improvement
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 5.2.0
    • Fix Version/s: 6.0.0
    • Component/s: Transport
    • Labels:
      None

      Description

      Investigate using an async write thread for the TCP transport. It would be able to more efficiently batch up multiple writes into a single socket write.

      Bellow is a patch that should be investigated. It should increase write performance of the TCP transport:

      $ svn diff
      Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
      ===================================================================
      --- activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(revision 742546)
      +++ activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java	(working copy)
      @@ -29,7 +29,9 @@
       import java.net.UnknownHostException;
       import java.util.HashMap;
       import java.util.Map;
      +import java.util.concurrent.ArrayBlockingQueue;
       import java.util.concurrent.CountDownLatch;
      +import java.util.concurrent.LinkedBlockingQueue;
       import java.util.concurrent.SynchronousQueue;
       import java.util.concurrent.ThreadFactory;
       import java.util.concurrent.ThreadPoolExecutor;
      @@ -119,6 +121,9 @@
           private Boolean tcpNoDelay;
           private Thread runnerThread;
       
      +    private final ArrayBlockingQueue<Object> outbound = new ArrayBlockingQueue<Object>(100);
      +    private Thread onewayThread;
      +
           /**
            * Connect to a remote Node - e.g. a Broker
            * 
      @@ -157,16 +162,39 @@
               this.localLocation = null;
               setDaemon(true);
           }
      -
      +    
           /**
            * A one way asynchronous send
            */
           public void oneway(Object command) throws IOException {
               checkStarted();
      -        wireFormat.marshal(command, dataOut);
      -        dataOut.flush();
      +        try {
      +            outbound.put(command);
      +        } catch (InterruptedException e) {
      +            throw new InterruptedIOException();
      +        }
           }
       
      +    protected void sendOneways() {
      +        try {
      +            while(!isStopped()) {
      +                Object command = outbound.poll(500, TimeUnit.MILLISECONDS);
      +                if( command!=null ) {
      +                    try {
      +                        while( command!=null ) {
      +                            wireFormat.marshal(command, dataOut);
      +                            command = outbound.poll();
      +                        }
      +                        dataOut.flush();
      +                    } catch (IOException e) {
      +                        getTransportListener().onException(e);
      +                    }
      +                }
      +            }
      +        } catch (InterruptedException e) {
      +        }
      +    }
      +
           /**
            * @return pretty print of 'this'
            */
      @@ -399,6 +427,11 @@
       
           protected void doStart() throws Exception {
               connect();
      +        onewayThread = new Thread(null, new Runnable(){
      +            public void run() {
      +                sendOneways();
      +            }}, "ActiveMQ Transport Sender: " + toString(), getStackSize());
      +        onewayThread.start();
               stoppedLatch.set(new CountDownLatch(1));
               super.doStart();
           }
      @@ -487,8 +520,12 @@
                           LOG.debug("Caught exception closing socket",e);
                       }
                   }
      -           
               }
      +        if( onewayThread!=null ) {
      +            onewayThread.join();
      +            onewayThread = null;
      +            outbound.clear();
      +        }
           }
       
           /**
      

        Activity

          People

          • Assignee:
            Hiram Chirino
            Reporter:
            Hiram Chirino
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:

              Development