Details
-
Bug
-
Status: Open
-
Critical
-
Resolution: Unresolved
-
qpid-java-broker-7.0.9, qpid-java-broker-9.2.0
-
None
-
None
Description
For our use case we are creating 3 connections and 40 sessions per connection. We have total 8000 queues and we are creating one consumer for each of these 8000 queues from total 120(3*40) sessions. So per queue we have 120 consumers ready to process messages.
connectionUrl = "amqp://<name>:<password>@test/?brokerlist='tcp://localhost:5672?tcp_nodelay='true''&maxprefetch='1'";
queueAddress = q + ";{create: always , node : {type : queue, durable : true, x-declare:{arguments:{'x-qpid-priorities':10}}}}";
Note that we are using prefetching with value of 1. So window based flow control is enabled.
In our production environment what we are noticing is even if we use single queue say Q0 to send and receive the message, the receivedComplete() method is iterating on all the sessions for a connection and for each session it is iterating overall the 8000 consumers and calling flushCreditState() to restore the credits. Internally since only one consumer has received the message for rest of the consumers calling flushCreditState() is a no-op and unnecessary wasting lot of CPU cycles.
Following show the stacks which are consuming more than 60% of the CPU as captured by thread profiling.
[0] 52.13% 74573 org.apache.qpid.server.protocol.v?.ServerSession.lambda$receivedComplete$1
[1] 52.13% 74573 org.apache.qpid.server.protocol.v?.ServerSession$$Lambda$?.?.run
[2] 52.13% 74573 java.security.AccessController.doPrivileged
[3] 52.13% 74573 org.apache.qpid.server.protocol.v?.ServerSession.runAsSubject
[4] 52.13% 74573 org.apache.qpid.server.protocol.v?.ServerSession.receivedComplete
[5] 52.13% 74573 org.apache.qpid.server.protocol.v?.ServerConnection.receivedComplete
[6] 52.13% 74573 org.apache.qpid.server.protocol.v?.AMQPConnection?Impl.lambda$received$1
[7] 52.13% 74573 org.apache.qpid.server.protocol.v?.AMQPConnection?Impl$$Lambda$?.?.run
[8] 52.13% 74573 java.security.AccessController.doPrivileged
[9] 52.13% 74573 org.apache.qpid.server.protocol.v?.AMQPConnection?Impl.received
[10] 52.13% 74573 org.apache.qpid.server.transport.MultiVersionProtocolEngine.received
[11] 52.13% 74573 org.apache.qpid.server.transport.NonBlockingConnection.processAmqpData
[12] 52.13% 74573 org.apache.qpid.server.transport.NonBlockingConnectionPlainDelegate.processData
[13] 52.13% 74573 org.apache.qpid.server.transport.NonBlockingConnection.doRead
[14] 52.13% 74573 org.apache.qpid.server.transport.NonBlockingConnection.doWork
[15] 52.13% 74573 org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection
[16] 52.13% 74573 org.apache.qpid.server.transport.SelectorThread$ConnectionProcessor.processConnection
[17] 49.53% 70850 org.apache.qpid.server.transport.SelectorThread$SelectionTask.performSelect
[18] 49.53% 70850 org.apache.qpid.server.transport.SelectorThread$SelectionTask.run
[19] 49.53% 70850 org.apache.qpid.server.transport.SelectorThread.run
[20] 49.53% 70850 java.util.concurrent.ThreadPoolExecutor.runWorker
[21] 49.53% 70850 java.util.concurrent.ThreadPoolExecutor$Worker.run
[22] 49.53% 70850 org.apache.qpid.server.bytebuffer.QpidByteBufferFactory.lambda$createQpidByteBufferTrackingThreadFactory$0
[23] 49.53% 70850 org.apache.qpid.server.bytebuffer.QpidByteBufferFactory$$Lambda$?.?.run
[24] 49.53% 70850 java.lang.Thread.run
[0] 8.45% 12083 org.apache.qpid.server.protocol.v?.ConsumerTarget?.restoreCredit
[1] 8.45% 12083 org.apache.qpid.server.protocol.v?.ConsumerTarget?.flushCreditState
[2] 8.45% 12082 org.apache.qpid.server.protocol.v?.ServerSession.lambda$receivedComplete$1
[3] 8.45% 12082 org.apache.qpid.server.protocol.v?.ServerSession$$Lambda$?.?.run
[4] 8.45% 12082 java.security.AccessController.doPrivileged
[5] 8.45% 12082 org.apache.qpid.server.protocol.v?.ServerSession.runAsSubject
[6] 8.45% 12082 org.apache.qpid.server.protocol.v?.ServerSession.receivedComplete
[7] 8.45% 12082 org.apache.qpid.server.protocol.v?.ServerConnection.receivedComplete
[8] 8.45% 12082 org.apache.qpid.server.protocol.v?.AMQPConnection?Impl.lambda$received$1
[9] 8.45% 12082 org.apache.qpid.server.protocol.v?.AMQPConnection?Impl$$Lambda$?.?.run
[10] 8.45% 12082 java.security.AccessController.doPrivileged
[11] 8.45% 12082 org.apache.qpid.server.protocol.v?.AMQPConnection?Impl.received
[12] 8.45% 12082 org.apache.qpid.server.transport.MultiVersionProtocolEngine.received
[13] 8.45% 12082 org.apache.qpid.server.transport.NonBlockingConnection.processAmqpData
[14] 8.45% 12082 org.apache.qpid.server.transport.NonBlockingConnectionPlainDelegate.processData
[15] 8.45% 12082 org.apache.qpid.server.transport.NonBlockingConnection.doRead
[16] 8.45% 12082 org.apache.qpid.server.transport.NonBlockingConnection.doWork
[17] 8.45% 12082 org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection
[18] 8.45% 12082 org.apache.qpid.server.transport.SelectorThread$ConnectionProcessor.processConnection
[19] 8.02% 11476 org.apache.qpid.server.transport.SelectorThread$SelectionTask.performSelect
[19] 0.42% 606 org.apache.qpid.server.transport.SelectorThread$ConnectionProcessor.run
I am attaching yourKit snapshot as well which shows above stacks consuming lot of CPU cycles.
void receivedComplete() { runAsSubject(() -> { final Collection<ConsumerTarget_0_10> subscriptions = getSubscriptions(); for (ConsumerTarget_0_10 subscription_0_10 : subscriptions) { subscription_0_10.flushCreditState(false); } awaitCommandCompletion(); return null; }); }
We can optimise this code by maintaining list/state of consumers for which we need to flush the credit and just iterate over them for each session. Please let me know if there is a gap in understanding and this code serves some other purpose as well.