Details
-
Bug
-
Status: Resolved
-
Blocker
-
Resolution: Fixed
-
5.0.0
-
None
-
None
-
Linux
Description
I have two brokers, on two different machines.
Broker A opens a duplex connection to Broker B, by having the following in its config:
<networkConnectors>
<networkConnector name="hostfora" uri="static://(tcp://hostforB:5675)" duplex="true" networkTTL="5" dynamicOnly="true"/>
</networkConnectors>
We have producers and consumers on both brokers, but the majority of the message flow is from the producers on A to the consumers on B. After a while, messages stop flowing. Thread dumps on both brokers reveal the following.
==================
Broker A Thread Dump
==================
"ActiveMQ Transport Status Monitor: openwire" daemon prio=10 tid=0xcdfc5000 nid=0x130f waiting on condition [0xcdab5000..0xcdab56f0]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.apache.activemq.broker.TransportStatusDetector.run(TransportStatusDetector.java:102)
at java.lang.Thread.run(Thread.java:619)
"ActiveMQ Transport Server: tcp://localhost:5675" daemon prio=10 tid=0xcdfc4800 nid=0x130e runnable [0xcdcff000..0xcdcff670]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)
- locked <0xd34062c8> (a java.net.SocksSocketImpl)
at java.net.ServerSocket.implAccept(ServerSocket.java:450)
at java.net.ServerSocket.accept(ServerSocket.java:421)
at org.apache.activemq.transport.tcp.TcpTransportServer.run(TcpTransportServer.java:197)
at java.lang.Thread.run(Thread.java:619)
INACTIVITY MONITORS
--------------------------------
"InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@34a2a5" daemon prio=10 tid=0x081c3800 nid=0x2ac8 waiting on condition [0xca55d000..0xca55d570]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0xd34050c8> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:944)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
at java.lang.Thread.run(Thread.java:619)
"InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@12875ac" daemon prio=10 tid=0x08495000 nid=0x2ac7 waiting on condition [0xca3ff000..0xca3ff4f0]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0xd34050c8> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:944)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
at java.lang.Thread.run(Thread.java:619)
"InactivityMonitor WriteCheck" prio=10 tid=0x0846ec00 nid=0x1311 in Object.wait() [0xcda13000..0xcda137f0]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.util.TimerThread.mainLoop(Timer.java:509)
- locked <0xd3408638> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:462)
"InactivityMonitor ReadCheck" prio=10 tid=0x08425800 nid=0x1310 in Object.wait() [0xcda64000..0xcda64770]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.util.TimerThread.mainLoop(Timer.java:509)
- locked <0xd3408a30> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:462)
Broker A - B CONNECTIONS
----------------------------------------
"ActiveMQ Transport: tcp://hostforB:5675" prio=10 tid=0x083d1000 nid=0x2696 runnable [0xca3ae000..0xca3ae770]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
at java.io.DataInputStream.readInt(DataInputStream.java:370)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:192)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:184)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
at java.lang.Thread.run(Thread.java:619)
====>
====> Blocked on write to broker A because broker A isn't reading?
====>
"ActiveMQ Connection Dispatcher: vm://localhost#18" daemon prio=10 tid=0x0849a800 nid=0x2695 runnable [0xcbaa8000..0xcbaa86f0]
java.lang.Thread.State: RUNNABLE
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:105)
at java.io.DataOutputStream.flush(DataOutputStream.java:106)
at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:154)
at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:189)
- locked <0xd42a6548> (a org.apache.activemq.transport.InactivityMonitor$2)
at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:82)
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:91)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0xd42a6028> (a java.lang.Object)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:578)
at org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(DemandForwardingBridgeSupport.java:138)
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:98)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0xd4252740> (a java.lang.Object)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1151)
at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:766)
at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:801)
at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
"VMTransport: vm://localhost#19" daemon prio=10 tid=0x08499000 nid=0x2694 in Object.wait() [0xca5ae000..0xca5ae670]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:485)
at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:105)
- locked <0xd4252468> (a java.lang.Object)
at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
PRODUCER CONNECTIONS
----------------------------------------
"ActiveMQ Connection Dispatcher: /producer1:36897" daemon prio=10 tid=0x083e0800 nid=0x131c in Object.wait() [0xcd6e9000..0xcd6e9570]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0xd34574b8> (a java.lang.Object)
at java.lang.Object.wait(Object.java:485)
at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:105) - locked <0xd34574b8> (a java.lang.Object)
at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
"ActiveMQ Transport: tcp:///producer1:36897" daemon prio=10 tid=0x0846bc00 nid=0x131a runnable [0xcd78b000..0xcd78b470]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
at java.io.DataInputStream.readInt(DataInputStream.java:370)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:192)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:184)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
at java.lang.Thread.run(Thread.java:619)
===================
Broker B Thread Dump
===================
"ActiveMQ Transport Status Monitor: ssl" daemon prio=10 tid=0x0836e000 nid=0x61b9 waiting on condition [0xcd9ae000..0xcd9ae5f0]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.apache.activemq.broker.TransportStatusDetector.run(TransportStatusDetector.java:102)
at java.lang.Thread.run(Thread.java:619)
"ActiveMQ Transport Server: ssl://localhost:443" daemon prio=10 tid=0x08537c00 nid=0x61b8 runnable [0xcd9ff000..0xcd9ff570]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)
- locked <0xd37acfb0> (a java.net.SocksSocketImpl)
at java.net.ServerSocket.implAccept(ServerSocket.java:450)
at com.sun.net.ssl.internal.ssl.SSLServerSocketImpl.accept(SSLServerSocketImpl.java:259)
at org.apache.activemq.transport.tcp.TcpTransportServer.run(TcpTransportServer.java:197)
at java.lang.Thread.run(Thread.java:619)
"ActiveMQ Transport Status Monitor: openwire" daemon prio=10 tid=0xce21fc00 nid=0x61b7 waiting on condition [0xcdb56000..0xcdb566f0]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.apache.activemq.broker.TransportStatusDetector.run(TransportStatusDetector.java:102)
at java.lang.Thread.run(Thread.java:619)
"ActiveMQ Transport Server: tcp://localhost:5675" daemon prio=10 tid=0xcdeffc00 nid=0x61b6 runnable [0xcddff000..0xcddff670]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)
- locked <0xd37aca78> (a java.net.SocksSocketImpl)
at java.net.ServerSocket.implAccept(ServerSocket.java:450)
at java.net.ServerSocket.accept(ServerSocket.java:421)
at org.apache.activemq.transport.tcp.TcpTransportServer.run(TcpTransportServer.java:197)
at java.lang.Thread.run(Thread.java:619)
INACTIVITY MONITORS
--------------------------------
"InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@704cf5" daemon prio=10 tid=0x081af400 nid=0x6498 waiting on condition [0xcba92000..0xcba926f0]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0xd346d318> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:944)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
at java.lang.Thread.run(Thread.java:619)
"InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@b8d09d" daemon prio=10 tid=0x081aec00 nid=0x6452 waiting for monitor entry [0xcd70b000..0xcd70c570]
java.lang.Thread.State: BLOCKED (on object monitor)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:661)
- waiting to lock <0xd3791d18> (a java.lang.Object)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.sendAlert(SSLSocketImpl.java:1624)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.warning(SSLSocketImpl.java:1477)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.closeInternal(SSLSocketImpl.java:1279) - locked <0xd3791408> (a com.sun.net.ssl.internal.ssl.SSLSocketImpl)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.close(SSLSocketImpl.java:1218)
at org.apache.activemq.transport.tcp.TcpTransport.doStop(TcpTransport.java:430)
at org.apache.activemq.util.ServiceSupport.stop(ServiceSupport.java:57)
at org.apache.activemq.transport.tcp.TcpTransport.stop(TcpTransport.java:439)
at org.apache.activemq.transport.InactivityMonitor.stop(InactivityMonitor.java:90)
at org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:63)
at org.apache.activemq.transport.WireFormatNegotiator.stop(WireFormatNegotiator.java:78)
at org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:63)
at org.apache.activemq.broker.TransportConnection.disposeTransport(TransportConnection.java:1239)
at org.apache.activemq.broker.TransportConnection.doStop(TransportConnection.java:917)
at org.apache.activemq.broker.jmx.ManagedTransportConnection.doStop(ManagedTransportConnection.java:74)
at org.apache.activemq.broker.TransportConnection.stop(TransportConnection.java:876)
at org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
at org.apache.activemq.broker.TransportConnection.serviceTransportException(TransportConnection.java:206)
at org.apache.activemq.broker.TransportConnection$1.onException(TransportConnection.java:185)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:143)
at org.apache.activemq.transport.InactivityMonitor.onException(InactivityMonitor.java:201)
at org.apache.activemq.transport.InactivityMonitor$5.run(InactivityMonitor.java:139)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:619)
"InactivityMonitor WriteCheck" daemon prio=10 tid=0x083c3c00 nid=0x61bb in Object.wait() [0xcd7ff000..0xcd7ff4f0]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0xd37d1e58> (a java.util.TaskQueue)
at java.util.TimerThread.mainLoop(Timer.java:509) - locked <0xd37d1e58> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:462)
"InactivityMonitor ReadCheck" daemon prio=10 tid=0x083c4800 nid=0x61ba in Object.wait() [0xcd95d000..0xcd95d470]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0xd37ae218> (a java.util.TaskQueue)
at java.util.TimerThread.mainLoop(Timer.java:509) - locked <0xd37ae218> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:462)
Broker A - B CONNECTIONS
---------------------------------------
"ActiveMQ Connection Dispatcher: vm://localhost#0" daemon prio=10 tid=0x082f5000 nid=0x61e8 waiting for monitor entry [0xccb57000..0xccb57570]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)
- waiting to lock <0xd3782640> (a java.lang.Object)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:579)
at org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(DemandForwardingBridgeSupport.java:138)
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:98)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0xd3782a58> (a java.lang.Object)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1151)
at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:766)
at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:801)
at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
"VMTransport: vm://localhost#1" daemon prio=10 tid=0x082f4000 nid=0x61e7 runnable [0xccba7000..0xccba86f0]
java.lang.Thread.State: RUNNABLE
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
at com.sun.net.ssl.internal.ssl.OutputRecord.writeBuffer(OutputRecord.java:283)
at com.sun.net.ssl.internal.ssl.OutputRecord.write(OutputRecord.java:272)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:665)
- locked <0xd3791d18> (a java.lang.Object)
at com.sun.net.ssl.internal.ssl.AppOutputStream.write(AppOutputStream.java:59) - locked <0xd3793090> (a com.sun.net.ssl.internal.ssl.AppOutputStream)
at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:105)
at java.io.DataOutputStream.flush(DataOutputStream.java:106)
at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:154)
at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:189) - locked <0xd3793868> (a org.apache.activemq.transport.InactivityMonitor$2)
at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:82)
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:91)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0xd3793958> (a java.lang.Object)
at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1151)
at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:766)
at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:727)
at org.apache.activemq.broker.region.TopicSubscription.dispatch(TopicSubscription.java:427)
at org.apache.activemq.broker.region.TopicSubscription.add(TopicSubscription.java:93)
at org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:601)
at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:427) - locked <0xd3609748> (a org.apache.activemq.broker.region.Topic)
at org.apache.activemq.broker.region.Topic.send(Topic.java:371)
at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:328)
at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:402)
at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:224)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:125)
at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:125)
at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:434)
at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:623)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202)
at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
====>
====> Nothing to dispatch from broker B
====>
"ActiveMQ Connection Dispatcher: /hostforA:50665" daemon prio=10 tid=0x08431800 nid=0x61e4 in Object.wait() [0xccc4a000..0xccc4a770]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0xd37a6010> (a java.lang.Object)
at java.lang.Object.wait(Object.java:485)
at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:105) - locked <0xd37a6010> (a java.lang.Object)
at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
====>
====> Queue of traffic to broker A is full - and because this is the reading thread it also stops reading....
====>
"ActiveMQ Transport: tcp:///hostforA:50665" daemon prio=10 tid=0x08430400 nid=0x61e3 waiting on condition [0xccc9b000..0xccc9b4f0]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0xd37827e8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1889)
at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:254)
at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:92)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0xd3782640> (a java.lang.Object)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:403)
at org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:149)
at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:134)
at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:170) - locked <0xd3779350> (a org.apache.activemq.transport.InactivityMonitor$1)
at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:185)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
at java.lang.Thread.run(Thread.java:619)
CONSUMER CONNECTIONS
----------------------------------------
"ActiveMQ Connection Dispatcher: /consumer1:2556" daemon prio=10 tid=0x085b9000 nid=0x61c8 in Object.wait() [0xcd484000..0xcd484570]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0xd37bfd00> (a java.lang.Object)
at java.lang.Object.wait(Object.java:485)
at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:105) - locked <0xd37bfd00> (a java.lang.Object)
at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
"ActiveMQ Transport: ssl:///consumer1:2556" daemon prio=10 tid=0x085be000 nid=0x61c2 runnable [0xcd66a000..0xcd66a470]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at com.sun.net.ssl.internal.ssl.InputRecord.readFully(InputRecord.java:293)
at com.sun.net.ssl.internal.ssl.InputRecord.read(InputRecord.java:331)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:722)
- locked <0xd37c0af0> (a java.lang.Object)
at com.sun.net.ssl.internal.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:679)
at com.sun.net.ssl.internal.ssl.AppInputStream.read(AppInputStream.java:75) - locked <0xd37c0e30> (a com.sun.net.ssl.internal.ssl.AppInputStream)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
at java.io.DataInputStream.readInt(DataInputStream.java:370)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:192)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:184)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
at java.lang.Thread.run(Thread.java:619)
It seems that B gets stuck because the queue in the VMTransport fills up. The thread in B that stops, both blocks removal of items from the queue, and also is responsible for reading data from the A to B socket. Therefore, all comms between A & B stops.