Uploaded image for project: 'Qpid'
  1. Qpid
  2. QPID-3016

Java JMS client ReplyTo memory leak

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.8
    • 0.9
    • JMS AMQP 0-x
    • None
    • Fedora 14: 2.6.35.10-74.fc14.x86_64

      Broker: qpidd (qpidc) version 0.8

      Java Client 0.8

      OpenJDK Runtime Environment (IcedTea6 1.9.3) (fedora-49.1.9.3.fc14-x86_64)
      OpenJDK 64-Bit Server VM (build 19.0-b09, mixed mode)

    Description

      I'm using the Java 0.8 client JMS interface. Relevant Client code is below.

      If I make async requests to a queue with setJMSReplyTo as temporary queue for responses, the client process will run out of memory after about 3.2M request/response message pairs. This is at 512MB max java heap, though growth appears to be consistently linear. Note that I use a semaphore between request and response to keep ~1000 unanswered requests open in the client and block before sending more. Thus this should not be a matter of simply saturating the client with unsent messages.

      Here is the top of the jhat histogram from jmap heap dump shortly before client runs out of memory:

      class org.apache.qpid.collections.ReferenceMap$SoftRef 3253120 143137280
      class org.apache.qpid.collections.ReferenceMap$Entry 3253120 117112320
      class [Lorg.apache.qpid.collections.ReferenceMap$Entry; 1 67108880
      class org.apache.qpid.transport.ReplyTo 3253120 61809280

      Note that 3253120 appears to match the total number of request/replies successfully processed by the client up to this point. Or in other words, its leaking one ReplyTo object and associated (not so?) soft references per request/response.

      If instead I replace the temporary queue with a fixed response and drop use of setJMSReplyTo(), the client works fine, no memory leak.

      Below are more details when running with the temp response queue:

      % qpid-config queues

      Queue Name Attributes
      ======================================================================
      TempQueued4051d9d-37d3-4306-a1fc-91b93f7082c8 auto-del excl
      iudex-brutefuzzy-request --max-queue-size=100000 --limit-policy=reject

      Client startup Log:

      635 [main] INFO o.a.q.j.PropertiesFileInitialContextFactory - No Provider URL specified.
      726 [main] INFO o.a.qpid.client.AMQConnection - Connection:amqp://qpid:********@default-client/default-vhost?brokerlist='tcp://localhost:5672'
      973 [main] INFO o.a.q.c.p.AMQProtocolSession - Using ProtocolVersion for Session:0-10
      990 [main] INFO o.a.q.c.h.ClientMethodDispatcherImpl - New Method Dispatcher:AMQProtocolSession[null]
      1002 [main] INFO o.a.qpid.client.AMQConnection - Connecting with ProtocolHandler Version:0-10
      1150 [main] INFO o.a.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-10
      1192 [main] INFO o.a.qpid.client.AMQSession - Created session:org.apache.qpid.client.AMQSession_0_10@1b7c63f
      1280 [main] INFO o.a.q.c.BasicMessageProducer_0_10 - MessageProducer org.apache.qpid.client.BasicMessageProducer_0_10@1727745 using publish mode : ASYNC_PUBLISH_ALL
      1503 [main] INFO o.a.qpid.client.AMQSession - Prefetching delayed existing messages will not flow until requested via receive*() or setML().
      1586 [main] INFO o.a.q.c.AMQSession.Dispatcher - Dispatcher-Channel-1 created
      1586 [Dispatcher-Channel-1] INFO o.a.q.c.AMQSession.Dispatcher - Dispatcher-Channel-1 started

      Relevent client code:

      public class Client
      implements MessageListener, Closeable, ExceptionListener
      {
      public Client( JMSContext context )
      throws JMSException, NamingException

      { _connection = context.createConnection(); _session = context.createSession( _connection ); Destination requestQueue = context.lookupDestination( "iudex-brutefuzzy-request" ); _producer = _session.createProducer( requestQueue ); context.close(); _responseQueue = _session.createTemporaryQueue(); _session.createConsumer( _responseQueue ).setMessageListener(this); _connection.start(); }

      public void sendRequest( long simhash, boolean doAdd )
      throws JMSException, InterruptedException
      {
      Builder bldr = Request.newBuilder();
      bldr.setSimhash( simhash );
      bldr.setAction( doAdd ? RequestAction.ADD : RequestAction.CHECK_ONLY );

      BytesMessage response = _session.createBytesMessage();
      response.writeBytes( bldr.build().toByteArray() );

      if( _responseQueue != null )

      { response.setJMSReplyTo( _responseQueue ); }

      _semaphore.acquire();

      _producer.send( response );
      }

      @Override
      public void onMessage( Message msg )
      {
      try

      { //Handle response msg.acknowledge(); _semaphore.release(); }

      catch( JMSException x ) {
      if( _log.isDebugEnabled() ) _log.error( "onMessage:", x );
      else _log.error( "onMessage: {}", x.toString() );
      }
      }

      private Session _session;
      private MessageProducer _producer;
      private boolean _createTemporaryResponseQueue = true;
      private Destination _responseQueue = null;
      private Connection _connection = null;
      private final Semaphore _semaphore = new Semaphore( 1000 );
      private Logger _log = LoggerFactory.getLogger( getClass() );
      }

      Attachments

        Issue Links

          Activity

            People

              robbie Robbie Gemmell
              dekellum David Kellum
              Votes:
              0 Vote for this issue
              Watchers:
              0 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: