Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
0.8
-
None
-
Fedora 14: 2.6.35.10-74.fc14.x86_64
Broker: qpidd (qpidc) version 0.8
Java Client 0.8
OpenJDK Runtime Environment (IcedTea6 1.9.3) (fedora-49.1.9.3.fc14-x86_64)
OpenJDK 64-Bit Server VM (build 19.0-b09, mixed mode)
Description
I'm using the Java 0.8 client JMS interface. Relevant Client code is below.
If I make async requests to a queue with setJMSReplyTo as temporary queue for responses, the client process will run out of memory after about 3.2M request/response message pairs. This is at 512MB max java heap, though growth appears to be consistently linear. Note that I use a semaphore between request and response to keep ~1000 unanswered requests open in the client and block before sending more. Thus this should not be a matter of simply saturating the client with unsent messages.
Here is the top of the jhat histogram from jmap heap dump shortly before client runs out of memory:
class org.apache.qpid.collections.ReferenceMap$SoftRef 3253120 143137280
class org.apache.qpid.collections.ReferenceMap$Entry 3253120 117112320
class [Lorg.apache.qpid.collections.ReferenceMap$Entry; 1 67108880
class org.apache.qpid.transport.ReplyTo 3253120 61809280
Note that 3253120 appears to match the total number of request/replies successfully processed by the client up to this point. Or in other words, its leaking one ReplyTo object and associated (not so?) soft references per request/response.
If instead I replace the temporary queue with a fixed response and drop use of setJMSReplyTo(), the client works fine, no memory leak.
Below are more details when running with the temp response queue:
% qpid-config queues
Queue Name Attributes
======================================================================
TempQueued4051d9d-37d3-4306-a1fc-91b93f7082c8 auto-del excl
iudex-brutefuzzy-request --max-queue-size=100000 --limit-policy=reject
Client startup Log:
635 [main] INFO o.a.q.j.PropertiesFileInitialContextFactory - No Provider URL specified.
726 [main] INFO o.a.qpid.client.AMQConnection - Connection:amqp://qpid:********@default-client/default-vhost?brokerlist='tcp://localhost:5672'
973 [main] INFO o.a.q.c.p.AMQProtocolSession - Using ProtocolVersion for Session:0-10
990 [main] INFO o.a.q.c.h.ClientMethodDispatcherImpl - New Method Dispatcher:AMQProtocolSession[null]
1002 [main] INFO o.a.qpid.client.AMQConnection - Connecting with ProtocolHandler Version:0-10
1150 [main] INFO o.a.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-10
1192 [main] INFO o.a.qpid.client.AMQSession - Created session:org.apache.qpid.client.AMQSession_0_10@1b7c63f
1280 [main] INFO o.a.q.c.BasicMessageProducer_0_10 - MessageProducer org.apache.qpid.client.BasicMessageProducer_0_10@1727745 using publish mode : ASYNC_PUBLISH_ALL
1503 [main] INFO o.a.qpid.client.AMQSession - Prefetching delayed existing messages will not flow until requested via receive*() or setML().
1586 [main] INFO o.a.q.c.AMQSession.Dispatcher - Dispatcher-Channel-1 created
1586 [Dispatcher-Channel-1] INFO o.a.q.c.AMQSession.Dispatcher - Dispatcher-Channel-1 started
Relevent client code:
public class Client
implements MessageListener, Closeable, ExceptionListener
{
public Client( JMSContext context )
throws JMSException, NamingException
public void sendRequest( long simhash, boolean doAdd )
throws JMSException, InterruptedException
{
Builder bldr = Request.newBuilder();
bldr.setSimhash( simhash );
bldr.setAction( doAdd ? RequestAction.ADD : RequestAction.CHECK_ONLY );
BytesMessage response = _session.createBytesMessage();
response.writeBytes( bldr.build().toByteArray() );
if( _responseQueue != null )
{ response.setJMSReplyTo( _responseQueue ); }_semaphore.acquire();
_producer.send( response );
}
@Override
public void onMessage( Message msg )
{
try
catch( JMSException x ) {
if( _log.isDebugEnabled() ) _log.error( "onMessage:", x );
else _log.error( "onMessage: {}", x.toString() );
}
}
private Session _session;
private MessageProducer _producer;
private boolean _createTemporaryResponseQueue = true;
private Destination _responseQueue = null;
private Connection _connection = null;
private final Semaphore _semaphore = new Semaphore( 1000 );
private Logger _log = LoggerFactory.getLogger( getClass() );
}
Attachments
Issue Links
- relates to
-
QPID-3440 the reply-to Destination cache in AMQMessageDelegate_0_10 does not work
- Closed