Index: C:/src/Eclipse/Workspaces/amq-4.1-branch/activemq-4.1-branch/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java =================================================================== --- C:/src/Eclipse/Workspaces/amq-4.1-branch/activemq-4.1-branch/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (revision 585924) +++ C:/src/Eclipse/Workspaces/amq-4.1-branch/activemq-4.1-branch/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java (working copy) @@ -70,85 +70,76 @@ private void writeCheck() { - synchronized(writeChecker) { - if( inSend.get() ) { - log.trace("A send is in progress"); - return; - } + if( inSend.get() ) { + log.trace("A send is in progress"); + return; + } - if( !commandSent.get() ) { - log.trace("No message sent since last write check, sending a KeepAliveInfo"); - try { - next.oneway(new KeepAliveInfo()); - } catch (IOException e) { - onException(e); - } - } else { - log.trace("Message sent since last write check, resetting flag"); + if( !commandSent.get() ) { + log.trace("No message sent since last write check, sending a KeepAliveInfo"); + try { + next.oneway(new KeepAliveInfo()); + } catch (IOException e) { + onException(e); } - - commandSent.set(false); + } else { + log.trace("Message sent since last write check, resetting flag"); } + + commandSent.set(false); } private void readCheck() { - synchronized(readChecker) { - if( inReceive.get() ) { - log.trace("A receive is in progress"); - return; - } + if( inReceive.get() ) { + log.trace("A receive is in progress"); + return; + } - if( !commandReceived.get() ) { - log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); - onException(new InactivityIOException("Channel was inactive for too long.")); - } else { - log.trace("Message received since last read check, resetting flag: "); - } - - commandReceived.set(false); + if( !commandReceived.get() ) { + log.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException."); + onException(new InactivityIOException("Channel was inactive for too long.")); + } else { + log.trace("Message received since last read check, resetting flag: "); } + commandReceived.set(false); } public void onCommand(Object command) { - synchronized(readChecker) { - inReceive.set(true); - try { - if( command.getClass() == WireFormatInfo.class ) { - synchronized( this ) { - remoteWireFormatInfo = (WireFormatInfo) command; - try { - startMonitorThreads(); - } catch (IOException e) { - onException(e); - } + inReceive.set(true); + try { + if( command.getClass() == WireFormatInfo.class ) { + synchronized( this ) { + remoteWireFormatInfo = (WireFormatInfo) command; + try { + startMonitorThreads(); + } catch (IOException e) { + onException(e); } } - transportListener.onCommand(command); - } finally { - inReceive.set(false); - commandReceived.set(true); } + transportListener.onCommand(command); + } finally { + inReceive.set(false); + commandReceived.set(true); } } public void oneway(Object o) throws IOException { - synchronized(writeChecker) { - // Disable inactivity monitoring while processing a command. - inSend.set(true); - commandSent.set(true); - try { - if( o.getClass() == WireFormatInfo.class ) { - synchronized( this ) { - localWireFormatInfo = (WireFormatInfo) o; - startMonitorThreads(); - } + // Disable inactivity monitoring while processing a command. + inSend.set(true); + commandSent.set(true); + try { + if( o.getClass() == WireFormatInfo.class ) { + synchronized( this ) { + localWireFormatInfo = (WireFormatInfo) o; + startMonitorThreads(); } - next.oneway(o); - } finally { - inSend.set(false); } + next.oneway(o); + } finally { + inSend.set(false); } }