|
[
Permalink
| « Hide
]
Gary Tully added a comment - 03/Nov/08 01:21 PM
these fixes look great. Can you attach your patch files to this jira and select the appropriate license grant option.
on the tcp-writetimeout patch
In the catch clause below: + public void oneway(Object command) throws IOException { + try { + registerWrite(this); + super.oneway(command); + } catch (IOException x) { + deRegisterWrite(this,true,x); + throw x; + } finally { fail is passed to deRegisterWrite which will cause a throw of the Forced exception rather than x. It seems that deRegisterWrite ignores the IOException argument which I guess is not intended, or is it? Would it be possible to include some junit tests for this functionality? maybe you misread the code
if (fail) { Filip thread name filter - patch 1
if (fail) { IOException ex = (iox!=null)?iox:new IOException("Forced write timeout for:"+filter.getNext().getRemoteAddress()); filter.getTransportListener().onException(ex); } one more thought:
would it be sufficient to do deRegister in the finally and only have the timeout thread force an exception. I guess the question is, why is deRegister interested in the exception at all? I am currently working on a test case so that you can see this in action.
In the real life the following way is the easiest to reproduce Have a few machines, consumers and producers working with the same topic On one consumer machine - pull out the network cord This action causes the connection to be in "ESTABLISHED" state, since no RST or FIN packets have been sent. During this time, the server will keep filling up the TCP send buffer in the kernel, and when it is full, the thread will get stuck in java.net.Socket.socketWrite0 best Here is another example of a thread locking up the entire system, based on the same scenario.
"BrokerService" daemon prio=10 tid=0x0000000060103800 nid=0x74e7 runnable [0x00000000467c7000..0x00000000467c7c10] java.lang.Thread.State: RUNNABLE at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:106) at java.io.DataOutputStream.flush(DataOutputStream.java:106) at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:165) at org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:233) - locked <0x00002aaabe89c2b0> (a java.util.concurrent.atomic.AtomicBoolean) at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83) at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:100) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - locked <0x00002aaabe89cc10> (a java.lang.Object) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1188) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:776) at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:813) at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122) at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) and as a result other threads are blocked "ActiveMQ Transport Stopper: /xx.xx.xxx.xxx:61489" daemon prio=10 tid=0x00000000607ad400 nid=0x7687 waiting for monitor entry [0x00000000450b0000..0x00000000450b0c90] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) - waiting to lock <0x00002aaabe89cc10> (a java.lang.Object) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1188) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:776) at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:735) ah, apologies, I did mis read. I understand the problem, thanks.
Still wondering bout my last comment though, why is deRegisterWriter interested in exceptions?
The reason is because deregister is atomic, that way we close the socket for the correct reason, and not make up a reason if a real socket exception and a timeout happen at the same time. I thought about it for a while, to just ignore the socket exception that came from the underlying layer, and pass it on, but then I thought it would be safer to invoke the onException to guarantee a socket closure regardless of the IO exception that came back. Am not sure it is safer because the filter introduces a change of behaviour to the normal exception case. Ie: onException is now always called.
In addition, in the event that a close is done async from an onException, there is still an opportunity to have a normal IOException interleaved with a Forced exception. I think this is the same as with a pass through on exception, a close can get called twice, but this is handled ok by close. Mostly though, I am wary of the change in behaviour introduced by the exception handler. As this is a filter that is added by choice it is not such a big deal but we may as well iron out the detail. This is a handy feature.
This is not the case. the deregister operation is atomic. deregister will never call throw twice. However, the oneway method will not stop the normal exception from propagating the underlying exception to the top. so only one exception will raise on the invoking thread.
not only handy, but essential. It prevents the entire server from a complete dead lock due to synchronization in the higher layers. That being said, the upper layer can react to the propagating IO exception if needed. But that shouldn't be an issue, this filter does what it is supposed to do. We could add more parameters to make the behavior configurable.
agreed, will commit in an hour or so, once a sanity run of the tests completes.
patch applied in r710109
thanks. Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
looks like I have a copy paste error + try {
+ writeTimestamp = System.currentTimeMillis();
+ out.write(b, off, len);
+ } finally {
+ writeTimestamp = System.currentTimeMillis();
+ } just to be sure, the change is one line, right?:
Index: src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
===================================================================
--- src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (revision 711284)
+++ src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (working copy)
@@ -95,7 +95,7 @@
writeTimestamp = System.currentTimeMillis();
out.write(b, off, len);
} finally {
- writeTimestamp = System.currentTimeMillis();
+ writeTimestamp = -1;
}
}
}
That's correct Gary, thank you!
These fixes will now make 5.2.0 rc3
Hi,
I am using ActiveMQ 5.3.0 and apparently run into this reported problem again. I attached the thread dump file for investigations. Unfortunately I can't reproduce this situation with a unit test. The only additional information I got from our client was that he tried to restarted the connection within very short period (1-2 sec). Felix, did you configure a timeout? transport.soWriteTimeout?
Example for configuring socket timeouts - transport.soTimeout applies to read, transport.soWriteTimeout applies to write. <transportConnector closing the WriteTimeoutFilter thread is not configured in the example stack trace.
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||