ActiveMQ
  1. ActiveMQ
  2. AMQ-3605

NullPointerException in TransportConnection

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 5.5.0
    • Fix Version/s: 5.6.0
    • Component/s: Transport
    • Labels:
    • Environment:

      SLES 11 SP1
      java version "1.6.0_23"
      perl 5.10.0 + Net::Stomp 0.38_99

      Description

      I'm running ActiveMQ 5.5.0 and clients using Net::Stomp 0.38_99 and I'm seeing infrequent NullPointerExceptions in TransportConnection:

      Exception in thread "ActiveMQ Connection Dispatcher: /172.31.201.11:50607" java.lang.NullPointerException
              at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:327)
              at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
              at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
              at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81)
              at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:79)
              at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:596)
              at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:58)
              at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
              at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270)
              at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815)
              at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:851)
              at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:104)
              at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)
      

      This seems to happen 1-2 times per month or so but the result is dire: new messages aren't delivered to the affected client (you can see the number of pending messages increasing in the admin web interface) until the client or ActiveMQ is restarted.

      Relevant code snippet from TransportConnection.java,

      326         if (context != null) {
      327             if (context.isDontSendReponse()) {
      

      implies that we are dealing with a race condition. I'm not familiar with the ActiveMQ code base but it looks like it grabs a lock (serviceLock) before entering that function, so not sure what's going on.

      Since there's no timestamp associated with the stack trace I'm not completly sure what's going on on the client side. I've tried to reproduce it by writing a small script that uses Net::Stomp in a similar way to my real clients, but no luck so far.

      No idea if it's relevant, but my affected clients have been both consuming and producing, and sending/receiving on both topics and queues.

      1. StompDedicatedTaskRunnerTests.java
        5 kB
        Timothy Bish
      2. StompDedicatedTaskRunnerTests.java
        7 kB
        Timothy Bish
      3. AMQ-3605-Patch.txt
        6 kB
        Timothy Bish

        Activity

        Hide
        Tommy Lindgren added a comment -

        I haven't tried the trunk version yet, but I don't see any relevant changes in svn so figured I should ask here first.

        Show
        Tommy Lindgren added a comment - I haven't tried the trunk version yet, but I don't see any relevant changes in svn so figured I should ask here first.
        Hide
        Gary Tully added a comment -

        please try trunk, if you reproduce it will provide a stack trace (line numbers) and context that matches trunk where it will be fixed.
        The addition of an inactivity monitor for stomp 1.1 support has caused some tightening up of the synchronization between stomp and amq request paths, this may be relevant.

        Also please attach the broker log files, it may be contention between connection tear down and dispatch. In addition, if you can reproduce, add trace level logging to TransportConnection to gather some extra information.

        Show
        Gary Tully added a comment - please try trunk, if you reproduce it will provide a stack trace (line numbers) and context that matches trunk where it will be fixed. The addition of an inactivity monitor for stomp 1.1 support has caused some tightening up of the synchronization between stomp and amq request paths, this may be relevant. Also please attach the broker log files, it may be contention between connection tear down and dispatch. In addition, if you can reproduce, add trace level logging to TransportConnection to gather some extra information.
        Hide
        Tommy Lindgren added a comment -

        Thanks for quick reply.

        I saved the stack trace above from the console output, I don't see it in my activemq.log. I do see some EOFExceptions for other clients in activemq.log around that time:

        2011-11-21 17:26:58,051 | INFO  | Transport failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///172.31
        .201.11:50680
        2011-11-21 17:44:39,533 | INFO  | Transport failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///172.31
        .201.11:46575
        2011-11-21 18:30:38,362 | INFO  | Transport failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///172.31
        .201.11:46577
        

        I think the NullPointerException occured shortly after 17:44. As far as I can tell, EOFExceptions are normal for stomp clients in 5.5.0, and I don't see it for affected clients anyway.

        I'll try a trunk snapshot.

        Show
        Tommy Lindgren added a comment - Thanks for quick reply. I saved the stack trace above from the console output, I don't see it in my activemq.log. I do see some EOFExceptions for other clients in activemq.log around that time: 2011-11-21 17:26:58,051 | INFO | Transport failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///172.31 .201.11:50680 2011-11-21 17:44:39,533 | INFO | Transport failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///172.31 .201.11:46575 2011-11-21 18:30:38,362 | INFO | Transport failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///172.31 .201.11:46577 I think the NullPointerException occured shortly after 17:44. As far as I can tell, EOFExceptions are normal for stomp clients in 5.5.0, and I don't see it for affected clients anyway. I'll try a trunk snapshot.
        Hide
        Timothy Bish added a comment -

        I've started looking into this a bit and while I haven't been able to reproduce the NullPointerException I can cause a different exception that seems to be related. The error seems to be coming from the auto ack of a message that's being dispatched as the connection is closed or broker on the client side. This happens more often with a large prefetch such that the broker is busy dispatching messages when the disconnect comes in.

        Attaching the test case I have so far as I will be out over thanksgiving.

        2011-11-23 10:20:42,152 [127.0.0.1:57419] - WARN  Service                        - Async error occurred: java.lang.IllegalStateException: Cannot lookup a consumer from a connection that had not been registered: ID:OfficePC-40536-1322061638124-3:1
        java.lang.IllegalStateException: Cannot lookup a consumer from a connection that had not been registered: ID:OfficePC-40536-1322061638124-3:1
        	at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:83)
        	at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1434)
        	at org.apache.activemq.broker.TransportConnection.getConsumerBrokerExchange(TransportConnection.java:1348)
        	at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:476)
        	at org.apache.activemq.command.MessageAck.visit(MessageAck.java:229)
        	at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318)
        	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181)
        	at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
        	at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229)
        	at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:87)
        	at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:78)
        	at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:746)
        	at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:64)
        	at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:262)
        	at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244)
        	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
        	at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1281)
        	at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:830)
        	at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:866)
        	at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
        	at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
        
        Show
        Timothy Bish added a comment - I've started looking into this a bit and while I haven't been able to reproduce the NullPointerException I can cause a different exception that seems to be related. The error seems to be coming from the auto ack of a message that's being dispatched as the connection is closed or broker on the client side. This happens more often with a large prefetch such that the broker is busy dispatching messages when the disconnect comes in. Attaching the test case I have so far as I will be out over thanksgiving. 2011-11-23 10:20:42,152 [127.0.0.1:57419] - WARN Service - Async error occurred: java.lang.IllegalStateException: Cannot lookup a consumer from a connection that had not been registered: ID:OfficePC-40536-1322061638124-3:1 java.lang.IllegalStateException: Cannot lookup a consumer from a connection that had not been registered: ID:OfficePC-40536-1322061638124-3:1 at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:83) at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1434) at org.apache.activemq.broker.TransportConnection.getConsumerBrokerExchange(TransportConnection.java:1348) at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:476) at org.apache.activemq.command.MessageAck.visit(MessageAck.java:229) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45) at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229) at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:87) at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:78) at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:746) at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:64) at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:262) at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1281) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:830) at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:866) at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
        Hide
        Timothy Bish added a comment -

        For the error I was able to reproduce the problem comes from a race between the processDispatch in TransportConnection's iterate method and the processRemoveConnection method where is does:

         TransportConnectionState state = unregisterConnectionState(id);
        

        Since the connection is removed right before the processDispatch males the loop from the Stomp transport's auto ack of the dispatched message we end up with the above error. The NPE that the reporter has seen is probably related to something similar.

        Show
        Timothy Bish added a comment - For the error I was able to reproduce the problem comes from a race between the processDispatch in TransportConnection's iterate method and the processRemoveConnection method where is does: TransportConnectionState state = unregisterConnectionState(id); Since the connection is removed right before the processDispatch males the loop from the Stomp transport's auto ack of the dispatched message we end up with the above error. The NPE that the reporter has seen is probably related to something similar.
        Hide
        Timothy Bish added a comment -

        Patch the seems to resolve the issue,

        Show
        Timothy Bish added a comment - Patch the seems to resolve the issue,
        Hide
        Timothy Bish added a comment -

        Updated test case, not really meant for inclusion as it doesn't actually assert anything to show the exceptions but at least allows them to be seen. Can fail as the disconnect response isn't always read fast enough due to all the prefetched messages.

        Show
        Timothy Bish added a comment - Updated test case, not really meant for inclusion as it doesn't actually assert anything to show the exceptions but at least allows them to be seen. Can fail as the disconnect response isn't always read fast enough due to all the prefetched messages.
        Hide
        Timothy Bish added a comment -

        Added a tentative patch to the TransportConnection class. Seems to resolve the issue but I was seeing some failures in some unit tests locally and haven't fully resolved if they are related to this change or just the normal flaky tests we see fail sometimes. Most failures seem to be in the Network bridge areas.

        Show
        Timothy Bish added a comment - Added a tentative patch to the TransportConnection class. Seems to resolve the issue but I was seeing some failures in some unit tests locally and haven't fully resolved if they are related to this change or just the normal flaky tests we see fail sometimes. Most failures seem to be in the Network bridge areas.
        Hide
        Tommy Lindgren added a comment - - edited

        I've done some testing against 5.6-SNAPSHOT (as of 2011-11-23). I don't see the NPE anymore but clients closing the socket in an unclean fashion seems to still be a problem.

        It looks like my affected client now hangs on the SEND call. I can reproduce this problem within minutes by stressing my clients while a script repeatedly forks, connects to AMQ, subscribes to a topic and then calls _exit() (to make sure the socket isn't closed by the garbage collector).

        The resulting AMQ log is 200MB+ and may contain sensitive information, so I can't attach it as it is. Just before the client hangs I see (log excerpts slightly edited, message bodies and headers are removed):

        2011-12-12 10:56:21,776 | TRACE | Received: 
        SEND
        [...]
        
        2011-12-12 10:56:21,776 | DEBUG | message is ActiveMQTextMessage {commandId = 2262, responseRequired = false, messageId = ID:dev01-54928-1323379526715-4:15980:-1:1:
        2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1, destination = topic://change.TRANSFER, trans
        actionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 1323683781775, correlationId = null, replyTo = n
        ull, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null,
         content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]}, readOnlyProperties = true, readO
        nlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?>
        <atom:...atom:entry>
        }
        2011-12-12 10:56:21,776 | DEBUG | Message available, but continuation is already resumed.  Buffer for next time.
        2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.io.EOFException
        java.io.EOFException
                at java.io.DataInputStream.readByte(DataInputStream.java:250)
                at org.apache.activemq.transport.stomp.StompWireFormat.readHeaderLine(StompWireFormat.java:155)
                at org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:148)
                at org.apache.activemq.transport.stomp.StompWireFormat.parseAction(StompWireFormat.java:170)
                at org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:98)
                at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:229)
                at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:221)
                at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204)
                at java.lang.Thread.run(Thread.java:662)
        2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:58:1:1, started=true } continuation=org.eclipse.jetty.s
        erver.AsyncContinuation@e9b412d@IDLE,initial
        2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:57:1:1, started=true } continuation=org.eclipse.jetty.s
        erver.AsyncContinuation@50afeae@IDLE,initial
        2011-12-12 10:56:21,776 | TRACE | org.apache.activemq.broker.region.cursors.QueueStorePrefetch@73a2335d:daemon.archived,batchResetNeeded=false,storeHasMessages=true,si
        ze=0,cacheEnabled=true - fillBatch
        2011-12-12 10:56:21,777 | TRACE | ID:dev01-54928-1323379526715-8:58:1:1 received message: MessageDispatch {commandId = 391, responseRequired = false, consumerId = I
        D:dev01-54928-1323379526715-8:58:1:1, destination = topic://change.TRANSFER, message = ActiveMQTextMessage {commandId = 2262, responseRequired = false, messageId = 
        ID:dev01-54928-1323379526715-4:15980:-1:1:2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1,
         destination = topic://change.TRANSFER, transactionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 132
        3683781775, correlationId = null, replyTo = null, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence = 0, targetConsumer
        Id = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]},
        readOnlyProperties = true, readOnlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?>
        <atom:...atom:entry>
        }, redeliveryCounter = 0}
        2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.net.SocketException: Broken pipe
        java.net.SocketException: Broken pipe
                at java.net.SocketOutputStream.socketWrite0(Native Method)
                at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
                at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
                at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115)
                at java.io.DataOutputStream.flush(DataOutputStream.java:106)
                at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:184)
                at org.apache.activemq.transport.stomp.StompTransportFilter.sendToStomp(StompTransportFilter.java:97)
                at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:99)
                at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:746)
                at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:64)
                at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:262)
                at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244)
                at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
                at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1281)
                at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:830)
                at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:866)
                at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
                at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
        

        The "Received: SEND" message is from my affected client. I see a bunch of such messages after the SocketException before the client hangs two seconds later. I assume the client hangs because some buffer at some level is full. I can't see anything obvious explaining why AMQ stops processing the client's messages.

        Occasionally I also see the following exception:

        java.lang.IllegalStateException: Cannot lookup a producer from a connection that had not been registered: ID:dev01-54928-1323379526715-4:59
                at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:93)
                at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1438)
                at org.apache.activemq.broker.TransportConnection.getProducerBrokerExchange(TransportConnection.java:1314)
                at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:468)
                at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681)
                at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318)
                at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181)
                at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
                at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229)
                at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:87)
                at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:166)
                at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:292)
                at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:204)
                at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:76)
                at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
                at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222)
                at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204)
        

        This exception is triggered more often when I do the stress test but sometimes it doesn't appear to have been thrown at all when the client has hanged.

        Not sure how useful these log excerpts are. I haven't tested the attached patch yet. If you think it will help I can try to build AMQ myself.

        Show
        Tommy Lindgren added a comment - - edited I've done some testing against 5.6-SNAPSHOT (as of 2011-11-23). I don't see the NPE anymore but clients closing the socket in an unclean fashion seems to still be a problem. It looks like my affected client now hangs on the SEND call. I can reproduce this problem within minutes by stressing my clients while a script repeatedly forks, connects to AMQ, subscribes to a topic and then calls _exit() (to make sure the socket isn't closed by the garbage collector). The resulting AMQ log is 200MB+ and may contain sensitive information, so I can't attach it as it is. Just before the client hangs I see (log excerpts slightly edited, message bodies and headers are removed): 2011-12-12 10:56:21,776 | TRACE | Received: SEND [...] 2011-12-12 10:56:21,776 | DEBUG | message is ActiveMQTextMessage {commandId = 2262, responseRequired = false, messageId = ID:dev01-54928-1323379526715-4:15980:-1:1: 2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1, destination = topic://change.TRANSFER, trans actionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 1323683781775, correlationId = null, replyTo = n ull, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]}, readOnlyProperties = true, readO nlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?> <atom:...atom:entry> } 2011-12-12 10:56:21,776 | DEBUG | Message available, but continuation is already resumed. Buffer for next time. 2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.io.EOFException java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:250) at org.apache.activemq.transport.stomp.StompWireFormat.readHeaderLine(StompWireFormat.java:155) at org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:148) at org.apache.activemq.transport.stomp.StompWireFormat.parseAction(StompWireFormat.java:170) at org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:98) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:229) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:221) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204) at java.lang.Thread.run(Thread.java:662) 2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:58:1:1, started=true } continuation=org.eclipse.jetty.s erver.AsyncContinuation@e9b412d@IDLE,initial 2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:57:1:1, started=true } continuation=org.eclipse.jetty.s erver.AsyncContinuation@50afeae@IDLE,initial 2011-12-12 10:56:21,776 | TRACE | org.apache.activemq.broker.region.cursors.QueueStorePrefetch@73a2335d:daemon.archived,batchResetNeeded=false,storeHasMessages=true,si ze=0,cacheEnabled=true - fillBatch 2011-12-12 10:56:21,777 | TRACE | ID:dev01-54928-1323379526715-8:58:1:1 received message: MessageDispatch {commandId = 391, responseRequired = false, consumerId = I D:dev01-54928-1323379526715-8:58:1:1, destination = topic://change.TRANSFER, message = ActiveMQTextMessage {commandId = 2262, responseRequired = false, messageId = ID:dev01-54928-1323379526715-4:15980:-1:1:2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1, destination = topic://change.TRANSFER, transactionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 132 3683781775, correlationId = null, replyTo = null, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence = 0, targetConsumer Id = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?> <atom:...atom:entry> }, redeliveryCounter = 0} 2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.net.SocketException: Broken pipe java.net.SocketException: Broken pipe at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115) at java.io.DataOutputStream.flush(DataOutputStream.java:106) at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:184) at org.apache.activemq.transport.stomp.StompTransportFilter.sendToStomp(StompTransportFilter.java:97) at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:99) at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:746) at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:64) at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:262) at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1281) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:830) at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:866) at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36) The "Received: SEND" message is from my affected client. I see a bunch of such messages after the SocketException before the client hangs two seconds later. I assume the client hangs because some buffer at some level is full. I can't see anything obvious explaining why AMQ stops processing the client's messages. Occasionally I also see the following exception: java.lang.IllegalStateException: Cannot lookup a producer from a connection that had not been registered: ID:dev01-54928-1323379526715-4:59 at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:93) at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1438) at org.apache.activemq.broker.TransportConnection.getProducerBrokerExchange(TransportConnection.java:1314) at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:468) at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45) at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229) at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:87) at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:166) at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:292) at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:204) at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:76) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204) This exception is triggered more often when I do the stress test but sometimes it doesn't appear to have been thrown at all when the client has hanged. Not sure how useful these log excerpts are. I haven't tested the attached patch yet. If you think it will help I can try to build AMQ myself.
        Hide
        Tommy Lindgren added a comment -

        I went ahead and tested the patch. It didn't apply cleanly, but the rejected hunks were only change of for-loop constructs, so I ignored those.

        The patch didn't solve the issue. Perhaps not related but at 17:54:01 I see a SocketException, and at 17:54:05 there's an InactivityIOException (my client hanged at 17:54:13):

        2011-12-12 17:54:05,045 | TRACE | Received:
        SEND
        [...]
        
        2011-12-12 17:54:05,043 | DEBUG | Transport failed: org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://172.31.201.1:45
        974
        org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://172.31.201.1:45974
                at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:255)
                at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244)
                at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
                at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1268)
                at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:804)
                at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:839)
                at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
                at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)
        

        The SEND message is from the affected client.

        Are there any particular messages I should look for? Or do you have any suggestions on where to put extra LOG.debug calls?

        Show
        Tommy Lindgren added a comment - I went ahead and tested the patch. It didn't apply cleanly, but the rejected hunks were only change of for-loop constructs, so I ignored those. The patch didn't solve the issue. Perhaps not related but at 17:54:01 I see a SocketException, and at 17:54:05 there's an InactivityIOException (my client hanged at 17:54:13): 2011-12-12 17:54:05,045 | TRACE | Received: SEND [...] 2011-12-12 17:54:05,043 | DEBUG | Transport failed: org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://172.31.201.1:45 974 org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://172.31.201.1:45974 at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:255) at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1268) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:804) at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:839) at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36) The SEND message is from the affected client. Are there any particular messages I should look for? Or do you have any suggestions on where to put extra LOG.debug calls?
        Hide
        Timothy Bish added a comment -

        Depends somewhat on what branch of code you applied this to. It should be applied to trunk and not a 5.5.x branch. Testing here shows no issues of this sort so far.

        Show
        Timothy Bish added a comment - Depends somewhat on what branch of code you applied this to. It should be applied to trunk and not a 5.5.x branch. Testing here shows no issues of this sort so far.
        Hide
        Tommy Lindgren added a comment - - edited

        The patch was applied to trunk (checkout from today, r1213236).

        Show
        Tommy Lindgren added a comment - - edited The patch was applied to trunk (checkout from today, r1213236).
        Hide
        Timothy Bish added a comment -

        From the little bit of info provided I don't see an issue here as it looks as though the client was disconnected due to inactivity. The debug log seems fine as it indicates that the send to the client fails because it was dropped. You didn't get any NullPointerExceptions correct?

        Show
        Timothy Bish added a comment - From the little bit of info provided I don't see an issue here as it looks as though the client was disconnected due to inactivity. The debug log seems fine as it indicates that the send to the client fails because it was dropped. You didn't get any NullPointerExceptions correct?
        Hide
        Tommy Lindgren added a comment -

        No NullPointerExceptions with 4.6-SNAPSHOT/trunk, that is correct.

        According to the "Connections" admin web page (and netstat), the affected client is still connected to AMQ. I assume all or most EOFExceptions, SocketExceptions and InactivityIOException are from my fork/connect/subscribe/exit script.

        Show
        Tommy Lindgren added a comment - No NullPointerExceptions with 4.6-SNAPSHOT/trunk, that is correct. According to the "Connections" admin web page (and netstat), the affected client is still connected to AMQ. I assume all or most EOFExceptions, SocketExceptions and InactivityIOException are from my fork/connect/subscribe/exit script.
        Hide
        Timothy Bish added a comment -

        Fix applied in trunk.

        Show
        Timothy Bish added a comment - Fix applied in trunk.

          People

          • Assignee:
            Timothy Bish
            Reporter:
            Tommy Lindgren
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development