Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-3016

Java JMS client ReplyTo memory leak


    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.8
    • Fix Version/s: 0.9
    • Component/s: JMS AMQP 0-x
    • Labels:
    • Environment:

      Fedora 14:

      Broker: qpidd (qpidc) version 0.8

      Java Client 0.8

      OpenJDK Runtime Environment (IcedTea6 1.9.3) (fedora-
      OpenJDK 64-Bit Server VM (build 19.0-b09, mixed mode)


      I'm using the Java 0.8 client JMS interface. Relevant Client code is below.

      If I make async requests to a queue with setJMSReplyTo as temporary queue for responses, the client process will run out of memory after about 3.2M request/response message pairs. This is at 512MB max java heap, though growth appears to be consistently linear. Note that I use a semaphore between request and response to keep ~1000 unanswered requests open in the client and block before sending more. Thus this should not be a matter of simply saturating the client with unsent messages.

      Here is the top of the jhat histogram from jmap heap dump shortly before client runs out of memory:

      class org.apache.qpid.collections.ReferenceMap$SoftRef 3253120 143137280
      class org.apache.qpid.collections.ReferenceMap$Entry 3253120 117112320
      class [Lorg.apache.qpid.collections.ReferenceMap$Entry; 1 67108880
      class org.apache.qpid.transport.ReplyTo 3253120 61809280

      Note that 3253120 appears to match the total number of request/replies successfully processed by the client up to this point. Or in other words, its leaking one ReplyTo object and associated (not so?) soft references per request/response.

      If instead I replace the temporary queue with a fixed response and drop use of setJMSReplyTo(), the client works fine, no memory leak.

      Below are more details when running with the temp response queue:

      % qpid-config queues

      Queue Name Attributes
      TempQueued4051d9d-37d3-4306-a1fc-91b93f7082c8 auto-del excl
      iudex-brutefuzzy-request --max-queue-size=100000 --limit-policy=reject

      Client startup Log:

      635 [main] INFO o.a.q.j.PropertiesFileInitialContextFactory - No Provider URL specified.
      726 [main] INFO o.a.qpid.client.AMQConnection - Connection:amqp://qpid:********@default-client/default-vhost?brokerlist='tcp://localhost:5672'
      973 [main] INFO o.a.q.c.p.AMQProtocolSession - Using ProtocolVersion for Session:0-10
      990 [main] INFO o.a.q.c.h.ClientMethodDispatcherImpl - New Method Dispatcher:AMQProtocolSession[null]
      1002 [main] INFO o.a.qpid.client.AMQConnection - Connecting with ProtocolHandler Version:0-10
      1150 [main] INFO o.a.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-10
      1192 [main] INFO o.a.qpid.client.AMQSession - Created session:org.apache.qpid.client.AMQSession_0_10@1b7c63f
      1280 [main] INFO o.a.q.c.BasicMessageProducer_0_10 - MessageProducer org.apache.qpid.client.BasicMessageProducer_0_10@1727745 using publish mode : ASYNC_PUBLISH_ALL
      1503 [main] INFO o.a.qpid.client.AMQSession - Prefetching delayed existing messages will not flow until requested via receive*() or setML().
      1586 [main] INFO o.a.q.c.AMQSession.Dispatcher - Dispatcher-Channel-1 created
      1586 [Dispatcher-Channel-1] INFO o.a.q.c.AMQSession.Dispatcher - Dispatcher-Channel-1 started

      Relevent client code:

      public class Client
      implements MessageListener, Closeable, ExceptionListener
      public Client( JMSContext context )
      throws JMSException, NamingException

      { _connection = context.createConnection(); _session = context.createSession( _connection ); Destination requestQueue = context.lookupDestination( "iudex-brutefuzzy-request" ); _producer = _session.createProducer( requestQueue ); context.close(); _responseQueue = _session.createTemporaryQueue(); _session.createConsumer( _responseQueue ).setMessageListener(this); _connection.start(); }

      public void sendRequest( long simhash, boolean doAdd )
      throws JMSException, InterruptedException
      Builder bldr = Request.newBuilder();
      bldr.setSimhash( simhash );
      bldr.setAction( doAdd ? RequestAction.ADD : RequestAction.CHECK_ONLY );

      BytesMessage response = _session.createBytesMessage();
      response.writeBytes( bldr.build().toByteArray() );

      if( _responseQueue != null )

      { response.setJMSReplyTo( _responseQueue ); }


      _producer.send( response );

      public void onMessage( Message msg )

      { //Handle response msg.acknowledge(); _semaphore.release(); }

      catch( JMSException x ) {
      if( _log.isDebugEnabled() ) _log.error( "onMessage:", x );
      else _log.error( "onMessage: {}", x.toString() );

      private Session _session;
      private MessageProducer _producer;
      private boolean _createTemporaryResponseQueue = true;
      private Destination _responseQueue = null;
      private Connection _connection = null;
      private final Semaphore _semaphore = new Semaphore( 1000 );
      private Logger _log = LoggerFactory.getLogger( getClass() );


          Issue Links



              • Assignee:
                gemmellr Robbie Gemmell
                dekellum David Kellum
              • Votes:
                0 Vote for this issue
                0 Start watching this issue


                • Created: