Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Critical Critical
    • Resolution: Fixed
    • Affects Version/s: 5.1.0, 5.2.0
    • Fix Version/s: 5.2.0
    • Component/s: Broker
    • Labels:
      None
    • Environment:

      Unix (Solaris and Linux tested)

    • Regression:
      Regression

      Description

      the blocking Java Socket API doesn't have a timeout on socketWrite invocations.
      This means, if a TCP session is dropped or terminated without RST or FIN packets, the operating system it left to eventually time out the session. On the linux kernel this timeout usually takes 15 to 30minutes.

      For this entire period, the AMQ server hangs, and producers and consumers are unable to use a topic.

      I have created two patches for this at the page:
      http://www.hanik.com/covalent/amq/index.html

      Let me show a bit more
      ---------------------------------
      "ActiveMQ Transport: tcp:///X.YYY.XXX.ZZZZ:2011" daemon prio=10 tid=0x0000000055d39000 nid=0xc78 runnable [0x00000000447c9000..0x00000000447cac10]
      java.lang.Thread.State: RUNNABLE
      at java.net.SocketOutputStream.socketWrite0(Native Method)
      at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)

      This is a thread stuck in blocking IO, and can be stuck for 30 minutes during the kernel TCP retransmission attempts.
      Unfortunately the thread dump is very misleading since the name of the thread, is not the destination or even remotely related to the socket it is operating on.
      To mend this, a very simple (and configurable) ThreadNameFilter has been suggested to the patch, that appends the destination and helps the system administrator correctly identify the client that is about to receive data.

      -----------------------------------
      at org.apache.activemq.broker.region.Topic.dispatch(Topic.java:581)
      at org.apache.activemq.broker.region.Topic.doMessageSend(Topic.java:421)

      • locked <0x00002aaaec155818> (a org.apache.activemq.broker.region.Topic)
        at org.apache.activemq.broker.region.Topic.send(Topic.java:363)
        The lock being held at this issue unfortunately makes the entire Topic single threaded.
        When this lock is being held, no other clients (producers and consumers) can publish to/receive from this topic.
        And this lock can hold up to 30 minutes.
        I consider solving this single threaded behavior a 'feature enhancement' that should be handled separately from this bug. Because even if it is solved, threads still risk being stuck in socketWrite0 for dropped connections that still appear to be established.

      For this, I have implemented a socket timeout filter, based on a TransportFilter, this filter only times out connections that are actually writing data.

      The two patches are at:
      http://www.hanik.com/covalent/amq/patch-1-threadname-filter.patch
      http://www.hanik.com/covalent/amq/patch-3-tcp-writetimeout.patch

      the binary 0000.jar applies to both 5.1 and trunk and can be used today in existing environments.

      1. jstack.blockedSession
        163 kB
        Felix Ehm
      2. patch-3-tcp-writetimeout.patch
        11 kB
        Filip Hanik
      3. patch-1-threadname-filter.patch
        4 kB
        Filip Hanik

        Activity

        Hide
        Felix Ehm added a comment -

        Hi,

        thanks for the hint. Haven't tested it, but it appears to be reasonable.

        Is there a reason for not having a timeout configured by default ?

        Cheers,
        Felix

        Show
        Felix Ehm added a comment - Hi, thanks for the hint. Haven't tested it, but it appears to be reasonable. Is there a reason for not having a timeout configured by default ? Cheers, Felix
        Hide
        Filip Hanik added a comment -

        closing the WriteTimeoutFilter thread is not configured in the example stack trace.

        Show
        Filip Hanik added a comment - closing the WriteTimeoutFilter thread is not configured in the example stack trace.
        Hide
        Filip Hanik added a comment -

        Felix, did you configure a timeout? transport.soWriteTimeout?

        Example for configuring socket timeouts - transport.soTimeout applies to read, transport.soWriteTimeout applies to write.

        <transportConnector
        name="tcp1"
        uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000"
        />

        Show
        Filip Hanik added a comment - Felix, did you configure a timeout? transport.soWriteTimeout? Example for configuring socket timeouts - transport.soTimeout applies to read, transport.soWriteTimeout applies to write. <transportConnector name="tcp1" uri="tcp://127.0.0.1:61616?transport.soTimeout=10000&transport.soWriteTimeout=15000" />
        Hide
        Felix Ehm added a comment -

        jstack thread dump

        Show
        Felix Ehm added a comment - jstack thread dump
        Hide
        Felix Ehm added a comment -

        jstack thread dump

        Show
        Felix Ehm added a comment - jstack thread dump
        Hide
        Felix Ehm added a comment - - edited

        Hi,

        I am using ActiveMQ 5.3.0 and apparently run into this reported problem again.
        Our clients weren't able to get any sessions and I first thought that it is a problem with the CopyOnWriteArray Object where consumers are kept. But then I saw one thread in RUNNABLE locking 3 mutexes and writing using java.net.SocketOutputStream.socketWrite0.

        I attached the thread dump file for investigations.

        Unfortunately I can't reproduce this situation with a unit test. The only additional information I got from our client was that he tried to restarted the connection within very short period (1-2 sec).

        Show
        Felix Ehm added a comment - - edited Hi, I am using ActiveMQ 5.3.0 and apparently run into this reported problem again. Our clients weren't able to get any sessions and I first thought that it is a problem with the CopyOnWriteArray Object where consumers are kept. But then I saw one thread in RUNNABLE locking 3 mutexes and writing using java.net.SocketOutputStream.socketWrite0. I attached the thread dump file for investigations. Unfortunately I can't reproduce this situation with a unit test. The only additional information I got from our client was that he tried to restarted the connection within very short period (1-2 sec).
        Hide
        Gary Tully added a comment -

        These fixes will now make 5.2.0 rc3

        Show
        Gary Tully added a comment - These fixes will now make 5.2.0 rc3
        Hide
        Filip Hanik added a comment -

        That's correct Gary, thank you!

        Show
        Filip Hanik added a comment - That's correct Gary, thank you!
        Hide
        Gary Tully added a comment -

        update applied in r711292

        Show
        Gary Tully added a comment - update applied in r711292
        Hide
        Gary Tully added a comment -

        just to be sure, the change is one line, right?:

        Index: src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
        ===================================================================
        --- src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java	(revision 711284)
        +++ src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java	(working copy)
        @@ -95,7 +95,7 @@
                             writeTimestamp = System.currentTimeMillis();
                             out.write(b, off, len);
                         } finally {
        -                    writeTimestamp = System.currentTimeMillis();
        +                    writeTimestamp = -1;
                         }
                     }
                 }
        
        Show
        Gary Tully added a comment - just to be sure, the change is one line, right?: Index: src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java =================================================================== --- src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (revision 711284) +++ src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (working copy) @@ -95,7 +95,7 @@ writeTimestamp = System .currentTimeMillis(); out.write(b, off, len); } finally { - writeTimestamp = System .currentTimeMillis(); + writeTimestamp = -1; } } }
        Hide
        Filip Hanik added a comment -

        Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java

        looks like I have a copy paste error

        + try

        { + writeTimestamp = System.currentTimeMillis(); + out.write(b, off, len); + } finally { + writeTimestamp = System.currentTimeMillis(); + }


        should be

        + try { + writeTimestamp = System.currentTimeMillis(); + out.write(b, off, len); + }

        finally

        { + writeTimestamp = -1; + }
        Show
        Filip Hanik added a comment - Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java looks like I have a copy paste error + try { + writeTimestamp = System.currentTimeMillis(); + out.write(b, off, len); + } finally { + writeTimestamp = System.currentTimeMillis(); + } should be + try { + writeTimestamp = System.currentTimeMillis(); + out.write(b, off, len); + } finally { + writeTimestamp = -1; + }
        Hide
        Gary Tully added a comment -

        patch applied in r710109
        thanks.

        Show
        Gary Tully added a comment - patch applied in r710109 thanks.
        Hide
        Gary Tully added a comment -

        agreed, will commit in an hour or so, once a sanity run of the tests completes.

        Show
        Gary Tully added a comment - agreed, will commit in an hour or so, once a sanity run of the tests completes.
        Hide
        Filip Hanik added a comment -

        That being said, the upper layer can react to the propagating IO exception if needed. But that shouldn't be an issue, this filter does what it is supposed to do. We could add more parameters to make the behavior configurable.

        Show
        Filip Hanik added a comment - That being said, the upper layer can react to the propagating IO exception if needed. But that shouldn't be an issue, this filter does what it is supposed to do. We could add more parameters to make the behavior configurable.
        Hide
        Filip Hanik added a comment -

        there is still an opportunity to have a normal IOException interleaved with a Forced exception.

        This is not the case. the deregister operation is atomic. deregister will never call throw twice. However, the oneway method will not stop the normal exception from propagating the underlying exception to the top. so only one exception will raise on the invoking thread.
        In case of a 'forced timeout exception' there will be two exceptions. On that is invoked in the onException, there will be a 2nd exception when the async handler calls socket.close, and terminates the write state.

        As this is a filter that is added by choice it is not such a big deal but we may as well iron out the detail. This is a handy feature.

        not only handy, but essential. It prevents the entire server from a complete dead lock due to synchronization in the higher layers.
        Once that synchronization has been mended, it is still a handy feature, since it prevents single threads from becoming locked for a very long time.

        Show
        Filip Hanik added a comment - there is still an opportunity to have a normal IOException interleaved with a Forced exception. This is not the case. the deregister operation is atomic. deregister will never call throw twice. However, the oneway method will not stop the normal exception from propagating the underlying exception to the top. so only one exception will raise on the invoking thread. In case of a 'forced timeout exception' there will be two exceptions. On that is invoked in the onException, there will be a 2nd exception when the async handler calls socket.close, and terminates the write state. As this is a filter that is added by choice it is not such a big deal but we may as well iron out the detail. This is a handy feature. not only handy, but essential. It prevents the entire server from a complete dead lock due to synchronization in the higher layers. Once that synchronization has been mended, it is still a handy feature, since it prevents single threads from becoming locked for a very long time.
        Hide
        Gary Tully added a comment -

        Am not sure it is safer because the filter introduces a change of behaviour to the normal exception case. Ie: onException is now always called.
        In addition, in the event that a close is done async from an onException, there is still an opportunity to have a normal IOException interleaved with a Forced exception.
        I think this is the same as with a pass through on exception, a close can get called twice, but this is handled ok by close.
        Mostly though, I am wary of the change in behaviour introduced by the exception handler.
        As this is a filter that is added by choice it is not such a big deal but we may as well iron out the detail. This is a handy feature.

        Show
        Gary Tully added a comment - Am not sure it is safer because the filter introduces a change of behaviour to the normal exception case. Ie: onException is now always called. In addition, in the event that a close is done async from an onException, there is still an opportunity to have a normal IOException interleaved with a Forced exception. I think this is the same as with a pass through on exception, a close can get called twice, but this is handled ok by close. Mostly though, I am wary of the change in behaviour introduced by the exception handler. As this is a filter that is added by choice it is not such a big deal but we may as well iron out the detail. This is a handy feature.
        Hide
        Filip Hanik added a comment -

        one more thought:
        would it be sufficient to do deRegister in the finally and only have the timeout thread force an exception. I guess the question is, why is deRegister interested in the exception at all?

        The reason is because deregister is atomic, that way we close the socket for the correct reason, and not make up a reason if a real socket exception and a timeout happen at the same time.

        I thought about it for a while, to just ignore the socket exception that came from the underlying layer, and pass it on, but then I thought it would be safer to invoke the onException to guarantee a socket closure regardless of the IO exception that came back.

        Show
        Filip Hanik added a comment - one more thought: would it be sufficient to do deRegister in the finally and only have the timeout thread force an exception. I guess the question is, why is deRegister interested in the exception at all? The reason is because deregister is atomic, that way we close the socket for the correct reason, and not make up a reason if a real socket exception and a timeout happen at the same time. I thought about it for a while, to just ignore the socket exception that came from the underlying layer, and pass it on, but then I thought it would be safer to invoke the onException to guarantee a socket closure regardless of the IO exception that came back.
        Hide
        Gary Tully added a comment -

        ah, apologies, I did mis read. I understand the problem, thanks.

        Still wondering bout my last comment though, why is deRegisterWriter interested in exceptions?

        Show
        Gary Tully added a comment - ah, apologies, I did mis read. I understand the problem, thanks. Still wondering bout my last comment though, why is deRegisterWriter interested in exceptions?
        Hide
        Filip Hanik added a comment - - edited

        Here is another example of a thread locking up the entire system, based on the same scenario.

        "BrokerService" daemon prio=10 tid=0x0000000060103800 nid=0x74e7 runnable [0x00000000467c7000..0x00000000467c7c10]
           java.lang.Thread.State: RUNNABLE
        	at java.net.SocketOutputStream.socketWrite0(Native Method)
        	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
        	at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
        	at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:106)
        	at java.io.DataOutputStream.flush(DataOutputStream.java:106)
        	at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:165)
        	at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:233)
        	- locked <0x00002aaabe89c2b0> (a java.util.concurrent.atomic.AtomicBoolean)
        	at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
        	at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:100)
        	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
        	- locked <0x00002aaabe89cc10> (a java.lang.Object)
        	at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1188)
        	at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:776)
        	at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:813)
        	at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
        	at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
        	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        	at java.lang.Thread.run(Thread.java:619)
        

        and as a result other threads are blocked

        "ActiveMQ Transport Stopper: /xx.xx.xxx.xxx:61489" daemon prio=10 tid=0x00000000607ad400 nid=0x7687 waiting for monitor entry [0x00000000450b0000..0x00000000450b0c90]
           java.lang.Thread.State: BLOCKED (on object monitor)
        	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
        	- waiting to lock <0x00002aaabe89cc10> (a java.lang.Object)
        	at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1188)
        	at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:776)
        	at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:735)
        
        Show
        Filip Hanik added a comment - - edited Here is another example of a thread locking up the entire system, based on the same scenario. "BrokerService" daemon prio=10 tid=0x0000000060103800 nid=0x74e7 runnable [0x00000000467c7000..0x00000000467c7c10] java.lang. Thread .State: RUNNABLE at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:106) at java.io.DataOutputStream.flush(DataOutputStream.java:106) at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:165) at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:233) - locked <0x00002aaabe89c2b0> (a java.util.concurrent.atomic.AtomicBoolean) at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83) at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:100) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0x00002aaabe89cc10> (a java.lang. Object ) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1188) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:776) at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:813) at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122) at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang. Thread .run( Thread .java:619) and as a result other threads are blocked "ActiveMQ Transport Stopper: /xx.xx.xxx.xxx:61489" daemon prio=10 tid=0x00000000607ad400 nid=0x7687 waiting for monitor entry [0x00000000450b0000..0x00000000450b0c90] java.lang. Thread .State: BLOCKED (on object monitor) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - waiting to lock <0x00002aaabe89cc10> (a java.lang. Object ) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1188) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:776) at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:735)
        Hide
        Filip Hanik added a comment -

        I am currently working on a test case so that you can see this in action.
        In the real life the following way is the easiest to reproduce

        Have a few machines, consumers and producers working with the same topic

        On one consumer machine - pull out the network cord

        This action causes the connection to be in "ESTABLISHED" state, since no RST or FIN packets have been sent.
        the Linux TCP implementation will then try to resend the packet 15 times (default config) with 2min in between each retransmission.

        During this time, the server will keep filling up the TCP send buffer in the kernel, and when it is full, the thread will get stuck in java.net.Socket.socketWrite0
        because this thread also holds the lock on the topic, no other thread can operate on the topic anymore

        best
        Filip

        Show
        Filip Hanik added a comment - I am currently working on a test case so that you can see this in action. In the real life the following way is the easiest to reproduce Have a few machines, consumers and producers working with the same topic On one consumer machine - pull out the network cord This action causes the connection to be in "ESTABLISHED" state, since no RST or FIN packets have been sent. the Linux TCP implementation will then try to resend the packet 15 times (default config) with 2min in between each retransmission. During this time, the server will keep filling up the TCP send buffer in the kernel, and when it is full, the thread will get stuck in java.net.Socket.socketWrite0 because this thread also holds the lock on the topic, no other thread can operate on the topic anymore best Filip
        Hide
        Gary Tully added a comment -

        one more thought:
        would it be sufficient to do deRegister in the finally and only have the timeout thread force an exception. I guess the question is, why is deRegister interested in the exception at all?

        Show
        Gary Tully added a comment - one more thought: would it be sufficient to do deRegister in the finally and only have the timeout thread force an exception. I guess the question is, why is deRegister interested in the exception at all?
        Hide
        Filip Hanik added a comment -
        if (fail) { 
          IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress()); 
          filter.getTransportListener().onException(ex); 
        }
        
        Show
        Filip Hanik added a comment - if (fail) { IOException ex = (iox!= null )?iox: new IOException( "Forced write timeout for :" +filter.getNext().getRemoteAddress()); filter.getTransportListener().onException(ex); }
        Hide
        Filip Hanik added a comment -

        thread name filter - patch 1

        Show
        Filip Hanik added a comment - thread name filter - patch 1
        Hide
        Filip Hanik added a comment -

        maybe you misread the code

        if (fail)

        { IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress()); filter.getTransportListener().onException(ex); }

        Filip

        Show
        Filip Hanik added a comment - maybe you misread the code if (fail) { IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress()); filter.getTransportListener().onException(ex); } Filip
        Hide
        Gary Tully added a comment -

        on the tcp-writetimeout patch

        In the catch clause below:

        +    public void oneway(Object command) throws IOException {
        +        try {
        +            registerWrite(this);
        +            super.oneway(command);
        +        } catch (IOException x) {
        +            deRegisterWrite(this,true,x);
        +            throw x;
        +        } finally {
        

        fail is passed to deRegisterWrite which will cause a throw of the Forced exception rather than x. It seems that deRegisterWrite ignores the IOException argument which I guess is not intended, or is it?

        Would it be possible to include some junit tests for this functionality?

        Show
        Gary Tully added a comment - on the tcp-writetimeout patch In the catch clause below: + public void oneway( Object command) throws IOException { + try { + registerWrite( this ); + super .oneway(command); + } catch (IOException x) { + deRegisterWrite( this , true ,x); + throw x; + } finally { fail is passed to deRegisterWrite which will cause a throw of the Forced exception rather than x. It seems that deRegisterWrite ignores the IOException argument which I guess is not intended, or is it? Would it be possible to include some junit tests for this functionality?
        Hide
        Gary Tully added a comment -

        these fixes look great. Can you attach your patch files to this jira and select the appropriate license grant option.

        Show
        Gary Tully added a comment - these fixes look great. Can you attach your patch files to this jira and select the appropriate license grant option.

          People

          • Assignee:
            Gary Tully
            Reporter:
            Filip Hanik
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development