Uploaded image for project: 'ActiveMQ Classic'
  1. ActiveMQ Classic
  2. AMQ-1641

Broker Network Deadlocking

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 5.0.0
    • 5.1.0
    • None
    • None
    • Linux

    Description

      I have two brokers, on two different machines.

      Broker A opens a duplex connection to Broker B, by having the following in its config:

      <networkConnectors>
      <networkConnector name="hostfora" uri="static://(tcp://hostforB:5675)" duplex="true" networkTTL="5" dynamicOnly="true"/>
      </networkConnectors>

      We have producers and consumers on both brokers, but the majority of the message flow is from the producers on A to the consumers on B. After a while, messages stop flowing. Thread dumps on both brokers reveal the following.

      ==================
      Broker A Thread Dump
      ==================

      "ActiveMQ Transport Status Monitor: openwire" daemon prio=10 tid=0xcdfc5000 nid=0x130f waiting on condition [0xcdab5000..0xcdab56f0]
      java.lang.Thread.State: TIMED_WAITING (sleeping)
      at java.lang.Thread.sleep(Native Method)
      at org.apache.activemq.broker.TransportStatusDetector.run(TransportStatusDetector.java:102)
      at java.lang.Thread.run(Thread.java:619)

      "ActiveMQ Transport Server: tcp://localhost:5675" daemon prio=10 tid=0xcdfc4800 nid=0x130e runnable [0xcdcff000..0xcdcff670]
      java.lang.Thread.State: RUNNABLE
      at java.net.PlainSocketImpl.socketAccept(Native Method)
      at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)

      • locked <0xd34062c8> (a java.net.SocksSocketImpl)
        at java.net.ServerSocket.implAccept(ServerSocket.java:450)
        at java.net.ServerSocket.accept(ServerSocket.java:421)
        at org.apache.activemq.transport.tcp.TcpTransportServer.run(TcpTransportServer.java:197)
        at java.lang.Thread.run(Thread.java:619)

      INACTIVITY MONITORS
      --------------------------------

      "InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@34a2a5" daemon prio=10 tid=0x081c3800 nid=0x2ac8 waiting on condition [0xca55d000..0xca55d570]
      java.lang.Thread.State: TIMED_WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0xd34050c8> (a java.util.concurrent.SynchronousQueue$TransferStack)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
        at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
        at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
        at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:944)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
        at java.lang.Thread.run(Thread.java:619)

      "InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@12875ac" daemon prio=10 tid=0x08495000 nid=0x2ac7 waiting on condition [0xca3ff000..0xca3ff4f0]
      java.lang.Thread.State: TIMED_WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0xd34050c8> (a java.util.concurrent.SynchronousQueue$TransferStack)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
        at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
        at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
        at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:944)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
        at java.lang.Thread.run(Thread.java:619)

      "InactivityMonitor WriteCheck" prio=10 tid=0x0846ec00 nid=0x1311 in Object.wait() [0xcda13000..0xcda137f0]
      java.lang.Thread.State: TIMED_WAITING (on object monitor)
      at java.lang.Object.wait(Native Method)
      at java.util.TimerThread.mainLoop(Timer.java:509)

      • locked <0xd3408638> (a java.util.TaskQueue)
        at java.util.TimerThread.run(Timer.java:462)

      "InactivityMonitor ReadCheck" prio=10 tid=0x08425800 nid=0x1310 in Object.wait() [0xcda64000..0xcda64770]
      java.lang.Thread.State: TIMED_WAITING (on object monitor)
      at java.lang.Object.wait(Native Method)
      at java.util.TimerThread.mainLoop(Timer.java:509)

      • locked <0xd3408a30> (a java.util.TaskQueue)
        at java.util.TimerThread.run(Timer.java:462)

      Broker A - B CONNECTIONS
      ----------------------------------------

      "ActiveMQ Transport: tcp://hostforB:5675" prio=10 tid=0x083d1000 nid=0x2696 runnable [0xca3ae000..0xca3ae770]
      java.lang.Thread.State: RUNNABLE
      at java.net.SocketInputStream.socketRead0(Native Method)
      at java.net.SocketInputStream.read(SocketInputStream.java:129)
      at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
      at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
      at java.io.DataInputStream.readInt(DataInputStream.java:370)
      at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
      at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:192)
      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:184)
      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
      at java.lang.Thread.run(Thread.java:619)

      ====>
      ====> Blocked on write to broker A because broker A isn't reading?
      ====>
      "ActiveMQ Connection Dispatcher: vm://localhost#18" daemon prio=10 tid=0x0849a800 nid=0x2695 runnable [0xcbaa8000..0xcbaa86f0]
      java.lang.Thread.State: RUNNABLE
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
      at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:105)
      at java.io.DataOutputStream.flush(DataOutputStream.java:106)
      at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:154)
      at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:189)

      • locked <0xd42a6548> (a org.apache.activemq.transport.InactivityMonitor$2)
        at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:82)
        at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:91)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
      • locked <0xd42a6028> (a java.lang.Object)
        at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
        at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:578)
        at org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(DemandForwardingBridgeSupport.java:138)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
        at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:98)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
      • locked <0xd4252740> (a java.lang.Object)
        at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
        at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1151)
        at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:766)
        at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:801)
        at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)

      "VMTransport: vm://localhost#19" daemon prio=10 tid=0x08499000 nid=0x2694 in Object.wait() [0xca5ae000..0xca5ae670]
      java.lang.Thread.State: WAITING (on object monitor)
      at java.lang.Object.wait(Native Method)
      at java.lang.Object.wait(Object.java:485)
      at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:105)

      • locked <0xd4252468> (a java.lang.Object)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)

      PRODUCER CONNECTIONS
      ----------------------------------------

      "ActiveMQ Connection Dispatcher: /producer1:36897" daemon prio=10 tid=0x083e0800 nid=0x131c in Object.wait() [0xcd6e9000..0xcd6e9570]
      java.lang.Thread.State: WAITING (on object monitor)
      at java.lang.Object.wait(Native Method)

      • waiting on <0xd34574b8> (a java.lang.Object)
        at java.lang.Object.wait(Object.java:485)
        at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:105)
      • locked <0xd34574b8> (a java.lang.Object)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)

      "ActiveMQ Transport: tcp:///producer1:36897" daemon prio=10 tid=0x0846bc00 nid=0x131a runnable [0xcd78b000..0xcd78b470]
      java.lang.Thread.State: RUNNABLE
      at java.net.SocketInputStream.socketRead0(Native Method)
      at java.net.SocketInputStream.read(SocketInputStream.java:129)
      at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
      at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
      at java.io.DataInputStream.readInt(DataInputStream.java:370)
      at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
      at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:192)
      at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:184)
      at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
      at java.lang.Thread.run(Thread.java:619)

      ===================
      Broker B Thread Dump
      ===================

      "ActiveMQ Transport Status Monitor: ssl" daemon prio=10 tid=0x0836e000 nid=0x61b9 waiting on condition [0xcd9ae000..0xcd9ae5f0]
      java.lang.Thread.State: TIMED_WAITING (sleeping)
      at java.lang.Thread.sleep(Native Method)
      at org.apache.activemq.broker.TransportStatusDetector.run(TransportStatusDetector.java:102)
      at java.lang.Thread.run(Thread.java:619)

      "ActiveMQ Transport Server: ssl://localhost:443" daemon prio=10 tid=0x08537c00 nid=0x61b8 runnable [0xcd9ff000..0xcd9ff570]
      java.lang.Thread.State: RUNNABLE
      at java.net.PlainSocketImpl.socketAccept(Native Method)
      at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)

      • locked <0xd37acfb0> (a java.net.SocksSocketImpl)
        at java.net.ServerSocket.implAccept(ServerSocket.java:450)
        at com.sun.net.ssl.internal.ssl.SSLServerSocketImpl.accept(SSLServerSocketImpl.java:259)
        at org.apache.activemq.transport.tcp.TcpTransportServer.run(TcpTransportServer.java:197)
        at java.lang.Thread.run(Thread.java:619)

      "ActiveMQ Transport Status Monitor: openwire" daemon prio=10 tid=0xce21fc00 nid=0x61b7 waiting on condition [0xcdb56000..0xcdb566f0]
      java.lang.Thread.State: TIMED_WAITING (sleeping)
      at java.lang.Thread.sleep(Native Method)
      at org.apache.activemq.broker.TransportStatusDetector.run(TransportStatusDetector.java:102)
      at java.lang.Thread.run(Thread.java:619)

      "ActiveMQ Transport Server: tcp://localhost:5675" daemon prio=10 tid=0xcdeffc00 nid=0x61b6 runnable [0xcddff000..0xcddff670]
      java.lang.Thread.State: RUNNABLE
      at java.net.PlainSocketImpl.socketAccept(Native Method)
      at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:384)

      • locked <0xd37aca78> (a java.net.SocksSocketImpl)
        at java.net.ServerSocket.implAccept(ServerSocket.java:450)
        at java.net.ServerSocket.accept(ServerSocket.java:421)
        at org.apache.activemq.transport.tcp.TcpTransportServer.run(TcpTransportServer.java:197)
        at java.lang.Thread.run(Thread.java:619)

      INACTIVITY MONITORS
      --------------------------------

      "InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@704cf5" daemon prio=10 tid=0x081af400 nid=0x6498 waiting on condition [0xcba92000..0xcba926f0]
      java.lang.Thread.State: TIMED_WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0xd346d318> (a java.util.concurrent.SynchronousQueue$TransferStack)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
        at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
        at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
        at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:944)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:906)
        at java.lang.Thread.run(Thread.java:619)

      "InactivityMonitor Async Task: java.util.concurrent.ThreadPoolExecutor$Worker@b8d09d" daemon prio=10 tid=0x081aec00 nid=0x6452 waiting for monitor entry [0xcd70b000..0xcd70c570]
      java.lang.Thread.State: BLOCKED (on object monitor)
      at com.sun.net.ssl.internal.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:661)

      • waiting to lock <0xd3791d18> (a java.lang.Object)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.sendAlert(SSLSocketImpl.java:1624)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.warning(SSLSocketImpl.java:1477)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.closeInternal(SSLSocketImpl.java:1279)
      • locked <0xd3791408> (a com.sun.net.ssl.internal.ssl.SSLSocketImpl)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.close(SSLSocketImpl.java:1218)
        at org.apache.activemq.transport.tcp.TcpTransport.doStop(TcpTransport.java:430)
        at org.apache.activemq.util.ServiceSupport.stop(ServiceSupport.java:57)
        at org.apache.activemq.transport.tcp.TcpTransport.stop(TcpTransport.java:439)
        at org.apache.activemq.transport.InactivityMonitor.stop(InactivityMonitor.java:90)
        at org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:63)
        at org.apache.activemq.transport.WireFormatNegotiator.stop(WireFormatNegotiator.java:78)
        at org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:63)
        at org.apache.activemq.broker.TransportConnection.disposeTransport(TransportConnection.java:1239)
        at org.apache.activemq.broker.TransportConnection.doStop(TransportConnection.java:917)
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.doStop(ManagedTransportConnection.java:74)
        at org.apache.activemq.broker.TransportConnection.stop(TransportConnection.java:876)
        at org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
        at org.apache.activemq.broker.TransportConnection.serviceTransportException(TransportConnection.java:206)
        at org.apache.activemq.broker.TransportConnection$1.onException(TransportConnection.java:185)
        at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
        at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
        at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:143)
        at org.apache.activemq.transport.InactivityMonitor.onException(InactivityMonitor.java:201)
        at org.apache.activemq.transport.InactivityMonitor$5.run(InactivityMonitor.java:139)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:885)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
        at java.lang.Thread.run(Thread.java:619)

      "InactivityMonitor WriteCheck" daemon prio=10 tid=0x083c3c00 nid=0x61bb in Object.wait() [0xcd7ff000..0xcd7ff4f0]
      java.lang.Thread.State: TIMED_WAITING (on object monitor)
      at java.lang.Object.wait(Native Method)

      • waiting on <0xd37d1e58> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:509)
      • locked <0xd37d1e58> (a java.util.TaskQueue)
        at java.util.TimerThread.run(Timer.java:462)

      "InactivityMonitor ReadCheck" daemon prio=10 tid=0x083c4800 nid=0x61ba in Object.wait() [0xcd95d000..0xcd95d470]
      java.lang.Thread.State: TIMED_WAITING (on object monitor)
      at java.lang.Object.wait(Native Method)

      • waiting on <0xd37ae218> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:509)
      • locked <0xd37ae218> (a java.util.TaskQueue)
        at java.util.TimerThread.run(Timer.java:462)

      Broker A - B CONNECTIONS
      ---------------------------------------

      "ActiveMQ Connection Dispatcher: vm://localhost#0" daemon prio=10 tid=0x082f5000 nid=0x61e8 waiting for monitor entry [0xccb57000..0xccb57570]
      java.lang.Thread.State: BLOCKED (on object monitor)
      at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:39)

      • waiting to lock <0xd3782640> (a java.lang.Object)
        at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
        at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:579)
        at org.apache.activemq.network.DemandForwardingBridgeSupport$1.onCommand(DemandForwardingBridgeSupport.java:138)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
        at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:98)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
      • locked <0xd3782a58> (a java.lang.Object)
        at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
        at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1151)
        at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:766)
        at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:801)
        at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)

      "VMTransport: vm://localhost#1" daemon prio=10 tid=0x082f4000 nid=0x61e7 runnable [0xccba7000..0xccba86f0]
      java.lang.Thread.State: RUNNABLE
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
      at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
      at com.sun.net.ssl.internal.ssl.OutputRecord.writeBuffer(OutputRecord.java:283)
      at com.sun.net.ssl.internal.ssl.OutputRecord.write(OutputRecord.java:272)
      at com.sun.net.ssl.internal.ssl.SSLSocketImpl.writeRecord(SSLSocketImpl.java:665)

      • locked <0xd3791d18> (a java.lang.Object)
        at com.sun.net.ssl.internal.ssl.AppOutputStream.write(AppOutputStream.java:59)
      • locked <0xd3793090> (a com.sun.net.ssl.internal.ssl.AppOutputStream)
        at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:105)
        at java.io.DataOutputStream.flush(DataOutputStream.java:106)
        at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:154)
        at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:189)
      • locked <0xd3793868> (a org.apache.activemq.transport.InactivityMonitor$2)
        at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:82)
        at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:91)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
      • locked <0xd3793958> (a java.lang.Object)
        at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1151)
        at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:766)
        at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:727)
        at org.apache.activemq.broker.region.TopicSubscription.dispatch(TopicSubscription.java:427)
        at org.apache.activemq.broker.region.TopicSubscription.add(TopicSubscription.java:93)
        at org.apache.activemq.broker.region.policy.SimpleDispatchPolicy.dispatch(SimpleDispatchPolicy.java:48)
        at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:601)
        at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:427)
      • locked <0xd3609748> (a org.apache.activemq.broker.region.Topic)
        at org.apache.activemq.broker.region.Topic.send(Topic.java:371)
        at org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.java:328)
        at org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java:402)
        at org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.java:224)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:125)
        at org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeDestinationBroker.java:95)
        at org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:125)
        at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135)
        at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:434)
        at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:623)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
        at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:202)
        at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)

      ====>
      ====> Nothing to dispatch from broker B
      ====>
      "ActiveMQ Connection Dispatcher: /hostforA:50665" daemon prio=10 tid=0x08431800 nid=0x61e4 in Object.wait() [0xccc4a000..0xccc4a770]
      java.lang.Thread.State: WAITING (on object monitor)
      at java.lang.Object.wait(Native Method)

      • waiting on <0xd37a6010> (a java.lang.Object)
        at java.lang.Object.wait(Object.java:485)
        at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:105)
      • locked <0xd37a6010> (a java.lang.Object)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)

      ====>
      ====> Queue of traffic to broker A is full - and because this is the reading thread it also stops reading....
      ====>
      "ActiveMQ Transport: tcp:///hostforA:50665" daemon prio=10 tid=0x08430400 nid=0x61e3 waiting on condition [0xccc9b000..0xccc9b4f0]
      java.lang.Thread.State: WAITING (parking)
      at sun.misc.Unsafe.park(Native Method)

      • parking to wait for <0xd37827e8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1889)
        at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:254)
        at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:92)
        at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
      • locked <0xd3782640> (a java.lang.Object)
        at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
        at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:403)
        at org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:149)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:100)
        at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
        at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:134)
        at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:170)
      • locked <0xd3779350> (a org.apache.activemq.transport.InactivityMonitor$1)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:185)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
        at java.lang.Thread.run(Thread.java:619)

      CONSUMER CONNECTIONS
      ----------------------------------------

      "ActiveMQ Connection Dispatcher: /consumer1:2556" daemon prio=10 tid=0x085b9000 nid=0x61c8 in Object.wait() [0xcd484000..0xcd484570]
      java.lang.Thread.State: WAITING (on object monitor)
      at java.lang.Object.wait(Native Method)

      • waiting on <0xd37bfd00> (a java.lang.Object)
        at java.lang.Object.wait(Object.java:485)
        at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:105)
      • locked <0xd37bfd00> (a java.lang.Object)
        at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)

      "ActiveMQ Transport: ssl:///consumer1:2556" daemon prio=10 tid=0x085be000 nid=0x61c2 runnable [0xcd66a000..0xcd66a470]
      java.lang.Thread.State: RUNNABLE
      at java.net.SocketInputStream.socketRead0(Native Method)
      at java.net.SocketInputStream.read(SocketInputStream.java:129)
      at com.sun.net.ssl.internal.ssl.InputRecord.readFully(InputRecord.java:293)
      at com.sun.net.ssl.internal.ssl.InputRecord.read(InputRecord.java:331)
      at com.sun.net.ssl.internal.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:722)

      • locked <0xd37c0af0> (a java.lang.Object)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:679)
        at com.sun.net.ssl.internal.ssl.AppInputStream.read(AppInputStream.java:75)
      • locked <0xd37c0e30> (a com.sun.net.ssl.internal.ssl.AppInputStream)
        at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
        at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
        at java.io.DataInputStream.readInt(DataInputStream.java:370)
        at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
        at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:192)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:184)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
        at java.lang.Thread.run(Thread.java:619)

      It seems that B gets stuck because the queue in the VMTransport fills up. The thread in B that stops, both blocks removal of items from the queue, and also is responsible for reading data from the A to B socket. Therefore, all comms between A & B stops.

      Attachments

        Activity

          People

            rajdavies Robert Davies
            samwalton Sam Walton
            Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: