Uploaded image for project: 'MINA'
  1. MINA
  2. DIRMINA-1076

Leaking NioProcessors/NioSocketConnectors hanging in call to dispose

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.16
    • 2.0.17
    • Core
    • None

    Description

      Follow-up to mailing list discussion.

      I was now able to reproduce the problem with a MINA test. Or let's say I did the brute-force approach by re-running one test in an endless loop.
      I have attached a patch of AbstractIoServiceTest (against https://github.com/apache/mina/tree/2.0) and a stack trace. After a few loops the test is stuck. You can see a lot of threads hanging in dispose() and the test is stuck when it tries to dispose the acceptor.

      What is a little strange is that the javadoc says that connector.dispose(TRUE) should not be called from an IoFutureListener, but in the test it is done anyway. However, changing the parameter to FALSE does not help either.

      Is there anything that can be done to prevent this hang?

      Attachments

        1. mina-dispose-hang.txt
          43 kB
          Christoph John
        2. mina-test-log.txt
          4 kB
          Christoph John
        3. mina-test-patch.txt
          3 kB
          Christoph John

        Issue Links

          Activity

            Okay, thank you.  I would like to have all the necessary information in one place.  Please attach the patch, the logs, and enough instructions that I can follow to setup and reproduce the issue.

            I also need the following:  OS version, JVM version, JVM Options used for the test, Processor Model(s) of your Target(s) computer; Versions of Mina which are failing and Git locations

            Basically everything you can think of that could be different from your environment than mine.

            johnnyv Jonathan Valliere added a comment - Okay, thank you.  I would like to have all the necessary information in one place.  Please attach the patch, the logs, and enough instructions that I can follow to setup and reproduce the issue. I also need the following:  OS version, JVM version, JVM Options used for the test, Processor Model(s) of your Target(s) computer; Versions of Mina which are failing and Git locations Basically everything you can think of that could be different from your environment than mine.
            chrjohn Christoph John added a comment - - edited

            Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode)
            Ubuntu Linux, Kernel 4.4.0-112-generic
            Intel(R) Core(TM) i5-2410M CPU @ 2.30GHz
            Tested against mina-2.0 from https://github.com/apache/mina/tree/2.0 so should be 2.0.17-SNAPSHOT

            I have run the AbstractIoServiceTest from within Eclipse then. Just attached the log file mina-test-log.txt (please note that I have run this just now so it does not conform to the attached stack trace).

            chrjohn Christoph John added a comment - - edited Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode) Ubuntu Linux, Kernel 4.4.0-112-generic Intel(R) Core(TM) i5-2410M CPU @ 2.30GHz Tested against mina-2.0 from https://github.com/apache/mina/tree/2.0 so should be 2.0.17-SNAPSHOT I have run the AbstractIoServiceTest from within Eclipse then. Just attached the log file mina-test-log.txt (please note that I have run this just now so it does not conform to the attached stack trace).

            What is the information for that TravisCI server which deadlocks more often (like you posted above)?

            What is the expected amount of time to see a deadlock on your Core i5-5410M?

            johnnyv Jonathan Valliere added a comment - What is the information for that TravisCI server which deadlocks more often (like you posted above)? What is the expected amount of time to see a deadlock on your Core i5-5410M?
            chrjohn Christoph John added a comment - - edited

            I think you can leave the Travis CI server out of the equation now that I can reproduce it with the patched test.
            The test locks up almost instantly as you can see in the attached log file. These tests were done locally on my machine. Travis CI is not involved anymore.

            chrjohn Christoph John added a comment - - edited I think you can leave the Travis CI server out of the equation now that I can reproduce it with the patched test. The test locks up almost instantly as you can see in the attached log file. These tests were done locally on my machine. Travis CI is not involved anymore.

            What is the JDK version?  What Ubuntu version?

            What CPU governor are you using on Ubuntu on your i5-2410M?  You can install 'indicator-cpufreq' (https://apps.ubuntu.com/cat/applications/quantal/indicator-cpufreq/) which allows you to change your CPU frequency governor. If available, set it to your CPU max (non-turbo) frequency and run the test again; otherwise set to "Performance" and run the test again.  Because you are using a mobile chipset, I would like to make sure that the frequency scaler on your platform isn't a contributing factor allowing deadlocks to happen more often.

            My goal is to get my environment to deadlock quickly and often; just like yours.

            I'm going to set this up for tomorrow.  If I can't get the results I want on my desktop, I'll setup a 2 core VM to try it from.

            johnnyv Jonathan Valliere added a comment - What is the JDK version?  What Ubuntu version? What CPU governor are you using on Ubuntu on your i5-2410M?  You can install 'indicator-cpufreq' ( https://apps.ubuntu.com/cat/applications/quantal/indicator-cpufreq/)  which allows you to change your CPU frequency governor. If available, set it to your CPU max (non-turbo) frequency and run the test again; otherwise set to "Performance" and run the test again.  Because you are using a mobile chipset, I would like to make sure that the frequency scaler on your platform isn't a contributing factor allowing deadlocks to happen more often. My goal is to get my environment to deadlock quickly and often; just like yours. I'm going to set this up for tomorrow.  If I can't get the results I want on my desktop, I'll setup a 2 core VM to try it from.
            chrjohn Christoph John added a comment -

            The exact version is actually Xubuntu 16.04. JDK version is 1.8.0_162-b12.

            I'll have to check the cpu governor stuff next week as I don't have the necessary rights to change this. But as it seems I am on powersave. :-/

            cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
            powersave
            powersave
            powersave
            powersave
            
            chrjohn Christoph John added a comment - The exact version is actually Xubuntu 16.04. JDK version is 1.8.0_162-b12. I'll have to check the cpu governor stuff next week as I don't have the necessary rights to change this. But as it seems I am on powersave. :-/ cat /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor powersave powersave powersave powersave

            Here is the patch to fix your issue; it also passes maven:test.  It passes the tests  but is not the final patch because I have some concerns about it long term.

            diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            index 79885fa..503df41 100644
            --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            @@ -661,7 +661,9 @@
             
                                 // And manage removed sessions
                                 nSessions -= removeSessions();
            -
            +                    
            +                    assert nSessions > -1 : "Number of Sessions has gone negative";
            +
                                 // Last, not least, send Idle events to the idle sessions
                                 notifyIdleSessions(currentTime);
             
            @@ -695,9 +697,8 @@
                                     for (Iterator<S> i = allSessions(); i.hasNext();) {
                                         IoSession session = i.next();
             
            -                            scheduleRemove((S) session);
            -
                                         if (session.isActive()) {
            +                                scheduleRemove((S) session);
                                             hasKeys = true;
                                         }
                                     }

             

            Emmanual,  while this patch fixes the problem Christoph is seeing.  I am concerned that relying on this change could cause a problem where the SelectionKey.isValid() i.e. `session.isActive()` is false but the Session was not previously removed correctly.  The AbstractIoProcessor is relying on the validity of the SelectionKey and the Set<SelectionKey> to internally manage active sessions which is a really bad move because the SelectionKeys can become invalid totally independent of the AbstractIoProcessor which could leave pending writing data and other things hanging forever.  Fundamentally, the problem is that, in the 2.0 master the Sessions are being removed TWICE from AbstractIoProcessor leaving the Session Counter in the Negative.  One way to fix the problem safely is to store all the Sessions for the IoProcessor as a List and use that as the master list of 'active' Sessions but I don't know if there is some lazy Session stealing between IoProcessors in Mina which could conflict with that idea.

             

            johnnyv Jonathan Valliere added a comment - Here is the patch to fix your issue; it also passes maven:test.  It passes the tests  but is not the final patch because I have some concerns about it long term. diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java index 79885fa..503df41 100644 --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java @@ -661,7 +661,9 @@                        // And manage removed sessions                      nSessions -= removeSessions(); - +                     +                    assert nSessions > -1 : " Number of Sessions has gone negative" ; +                      // Last, not least, send Idle events to the idle sessions                      notifyIdleSessions(currentTime);   @@ -695,9 +697,8 @@                          for (Iterator<S> i = allSessions(); i.hasNext();) {                              IoSession session = i.next();   -                            scheduleRemove((S) session); -                              if (session.isActive()) { +                                scheduleRemove((S) session);                                  hasKeys = true ;                              }                          }   Emmanual,  while this patch fixes the problem Christoph is seeing.  I am concerned that relying on this change could cause a problem where the SelectionKey.isValid() i.e. `session.isActive()` is false but the Session was not previously removed correctly.  The AbstractIoProcessor is relying on the validity of the SelectionKey and the Set<SelectionKey> to internally manage active sessions which is a really bad move because the SelectionKeys can become invalid totally independent of the AbstractIoProcessor which could leave pending writing data and other things hanging forever.  Fundamentally, the problem is that, in the 2.0 master the Sessions are being removed TWICE from AbstractIoProcessor leaving the Session Counter in the Negative.  One way to fix the problem safely is to store all the Sessions for the IoProcessor as a List and use that as the master list of 'active' Sessions but I don't know if there is some lazy Session stealing between IoProcessors in Mina which could conflict with that idea.  
            johnnyv Jonathan Valliere added a comment - - edited

            Notes:

            I've thought about it and have come up with the following patch which resolves my question about the previous patch.  I was attempting to find a solution which made the smallest number of changes to resolve the problem.  As it turns out, moving the isDisposing() check above removeSessions() resolves the double-removal issue due to blind removal scheduling based on the Selectors keys()

            I believe the method in the following patch to be safe due to the nature of the disposing state; since disposal is immediate and we don't care about letting Sessions soft-close there is no need to wakeup the Selector.

            What was happening:

            • process(Session) would cause self remove via scheduleRemoval
            • removeSessions removes all sessions scheduled via scheduleRemoval
            • isDisposing() block would add ALL sessions from selector.keys() to scheduleRemoval causing the previously self removed session to be scheduled, again, using scheduleRemoval. (selector.keys() does not update until the next time select() is called.)  The next time removeSessions is called, the session counter goes negative.  The self-destruct routine checks for (session count == 0) which never is satisfied because the session count is negative number.

            This patch works by moving the isDisposing() block before removeSessions utilizing the contains(Session) check of removeSessions to prevent a single Session from being added to the list twice.

            Patch to prevent double-removal of a single session which causes the counter to go negative preventing the processor from disposing.

            diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            index 79885fa..2715b98 100644
            --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            @@ -659,9 +659,20 @@
                                 long currentTime = System.currentTimeMillis();
                                 flush(currentTime);
             
            +                    // Disconnect all sessions immediately if disposal has been
            +                    // requested so that we exit this loop eventually.
            +                    if (isDisposing()) {
            +                        for (Iterator<S> i = allSessions(); i.hasNext();) {
            +                            IoSession session = i.next();
            +                            scheduleRemove((S) session);
            +                        }
            +                    }
            +                    
                                 // And manage removed sessions
                                 nSessions -= removeSessions();
            -
            +                    
            +                    assert nSessions > -1 : "Internal Session Count is Negative";
            +                    
                                 // Last, not least, send Idle events to the idle sessions
                                 notifyIdleSessions(currentTime);
             
            @@ -685,26 +696,6 @@
                                     }
             
                                     assert processorRef.get() == this;
            -                    }
            -
            -                    // Disconnect all sessions immediately if disposal has been
            -                    // requested so that we exit this loop eventually.
            -                    if (isDisposing()) {
            -                        boolean hasKeys = false;
            -
            -                        for (Iterator<S> i = allSessions(); i.hasNext();) {
            -                            IoSession session = i.next();
            -
            -                            scheduleRemove((S) session);
            -
            -                            if (session.isActive()) {
            -                                hasKeys = true;
            -                            }
            -                        }
            -
            -                        if (hasKeys) {
            -                            wakeup();
            -                        }
                                 }
                             } catch (ClosedSelectorException cse) {
                                 // If the selector has been closed, we can exit the loop
            
            

             

            I had to change Christoph's AbstractIoServiceTest patch because it calls connector.dispose(true) which the documentation and comments above say is something that should not be done within an IoFutureListener.  The reason I had to change from true to false was because dispose(true) blocks on itself and preventing the IoProcessor thread from ever stopping.  In the described test, this causes the application to create threads until the OS complains about it preventing more threads from being created.

            Patch to make the test run in a loop

            diff --git a/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java b/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java
            index 2d70f8e..325c4e4 100644
            --- a/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java
            +++ b/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java
            @@ -30,6 +30,7 @@
             import org.apache.mina.filter.logging.LoggingFilter;
             import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
             import org.apache.mina.transport.socket.nio.NioSocketConnector;
            +import org.apache.mina.util.AvailablePortFinder;
             import org.junit.Test;
             import org.slf4j.Logger;
             import org.slf4j.LoggerFactory;
            @@ -48,16 +49,15 @@
              */
             public class AbstractIoServiceTest {
             
            -    private static final int PORT = 9123;
            -
                 @Test
                 public void testDispose() throws IOException, InterruptedException {
             
            +        while ( true) {
                     List<String> threadsBefore = getThreadNames();
             
                     final IoAcceptor acceptor = new NioSocketAcceptor();
             
            -        acceptor.getFilterChain().addLast("logger", new LoggingFilter());
            +     //   acceptor.getFilterChain().addLast("logger", new LoggingFilter());
                     acceptor.getFilterChain().addLast("codec",
                             new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
             
            @@ -65,7 +65,8 @@
             
                     acceptor.getSessionConfig().setReadBufferSize(2048);
                     acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
            -        acceptor.bind(new InetSocketAddress(PORT));
            +        int nextAvailable = AvailablePortFinder.getNextAvailable();
            +        acceptor.bind(new InetSocketAddress(nextAvailable));
                     System.out.println("Server running ...");
             
                     final NioSocketConnector connector = new NioSocketConnector();
            @@ -74,12 +75,12 @@
                     connector.setConnectTimeoutMillis(30 * 1000L);
             
                     connector.setHandler(new ClientHandler());
            -        connector.getFilterChain().addLast("logger", new LoggingFilter());
            +      //  connector.getFilterChain().addLast("logger", new LoggingFilter());
                     connector.getFilterChain().addLast("codec",
                             new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
             
                     // Start communication.
            -        ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", 9123));
            +        ConnectFuture cf = connector.connect(new InetSocketAddress("localhost", nextAvailable));
                     cf.awaitUninterruptibly();
             
                     IoSession session = cf.getSession();
            @@ -103,7 +104,9 @@
                         public void operationComplete(IoFuture future) {
                             System.out.println("managed session count=" + connector.getManagedSessionCount());
                             System.out.println("Disposing connector ...");
            -                connector.dispose(true);
            +                // the doc states that the following should not be called with parameter TRUE from an IoFutureListener?!
            +                // on the other hand, using FALSE does not work either
            +                connector.dispose(false);    
                             System.out.println("Disposing connector ... *finished*");
             
                         }
            @@ -114,11 +117,11 @@
             
                     List<String> threadsAfter = getThreadNames();
             
            -        System.out.println("threadsBefore = " + threadsBefore);
            -        System.out.println("threadsAfter  = " + threadsAfter);
            +       // System.out.println("threadsBefore = " + threadsBefore);
            +       // System.out.println("threadsAfter  = " + threadsAfter);
             
                     // Assert.assertEquals(threadsBefore, threadsAfter);
            -
            +    }
                 }
             
                 public static class ClientHandler extends IoHandlerAdapter {
            

             

             

            johnnyv Jonathan Valliere added a comment - - edited Notes: I've thought about it and have come up with the following patch which resolves my question about the previous patch.  I was attempting to find a solution which made the smallest number of changes to resolve the problem.  As it turns out, moving the isDisposing() check above removeSessions() resolves the double-removal issue due to blind removal scheduling based on the Selectors keys() .  I believe the method in the following patch to be safe due to the nature of the disposing state; since disposal is immediate and we don't care about letting Sessions soft-close there is no need to wakeup the Selector. What was happening: process(Session)  would cause self remove via scheduleRemoval removeSessions  removes all sessions scheduled via scheduleRemoval isDisposing() block would add ALL sessions from selector.keys() to scheduleRemoval causing the previously self removed session to be scheduled, again, using scheduleRemoval . ( selector.keys() does not update until the next time select() is called.)  The next time removeSessions is called, the session counter goes negative.  The self-destruct routine checks for ( session count == 0)  which never is satisfied because the session count is negative number. This patch works by moving the  isDisposing() block before removeSessions utilizing the contains(Session) check of removeSessions to prevent a single Session from being added to the list twice. Patch to prevent double-removal of a single session which causes the counter to go negative preventing the processor from disposing. diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java index 79885fa..2715b98 100644 --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java @@ -659,9 +659,20 @@                      long currentTime = System .currentTimeMillis();                      flush(currentTime);   +                    // Disconnect all sessions immediately if disposal has been +                    // requested so that we exit this loop eventually. +                    if (isDisposing()) { +                        for (Iterator<S> i = allSessions(); i.hasNext();) { +                            IoSession session = i.next(); +                            scheduleRemove((S) session); +                        } +                    } +                                          // And manage removed sessions                      nSessions -= removeSessions(); - +                     +                    assert nSessions > -1 : "Internal Session Count is Negative" ; +                                          // Last, not least, send Idle events to the idle sessions                      notifyIdleSessions(currentTime);   @@ -685,26 +696,6 @@                          }                            assert processorRef.get() == this ; -                    } - -                    // Disconnect all sessions immediately if disposal has been -                    // requested so that we exit this loop eventually. -                    if (isDisposing()) { -                        boolean hasKeys = false ; - -                        for (Iterator<S> i = allSessions(); i.hasNext();) { -                            IoSession session = i.next(); - -                            scheduleRemove((S) session); - -                            if (session.isActive()) { -                                hasKeys = true ; -                            } -                        } - -                        if (hasKeys) { -                            wakeup(); -                        }                      }                  } catch (ClosedSelectorException cse) {                      // If the selector has been closed, we can exit the loop   I had to change Christoph's AbstractIoServiceTest patch because it calls connector.dispose(true) which the documentation and comments above say is something that should not be done within an IoFutureListener .  The reason I had to change from  true to  false was because dispose(true) blocks on itself and preventing the IoProcessor thread from ever stopping.  In the described test, this causes the application to create threads until the OS complains about it preventing more threads from being created. Patch to make the test run in a loop diff --git a/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java b/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java index 2d70f8e..325c4e4 100644 --- a/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java +++ b/mina-core/src/test/java/org/apache/mina/core/service/AbstractIoServiceTest.java @@ -30,6 +30,7 @@   import org.apache.mina.filter.logging.LoggingFilter;   import org.apache.mina.transport.socket.nio.NioSocketAcceptor;   import org.apache.mina.transport.socket.nio.NioSocketConnector; + import org.apache.mina.util.AvailablePortFinder;   import org.junit.Test;   import org.slf4j.Logger;   import org.slf4j.LoggerFactory; @@ -48,16 +49,15 @@   */   public class AbstractIoServiceTest {   -    private static final int PORT = 9123; -      @Test      public void testDispose() throws IOException, InterruptedException {   +        while ( true ) {          List< String > threadsBefore = getThreadNames();            final IoAcceptor acceptor = new NioSocketAcceptor();   -        acceptor.getFilterChain().addLast( "logger" , new LoggingFilter()); +     //   acceptor.getFilterChain().addLast( "logger" , new LoggingFilter());          acceptor.getFilterChain().addLast( "codec" ,                  new ProtocolCodecFilter( new TextLineCodecFactory(Charset.forName( "UTF-8" ))));   @@ -65,7 +65,8 @@            acceptor.getSessionConfig().setReadBufferSize(2048);          acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); -        acceptor.bind( new InetSocketAddress(PORT)); +        int nextAvailable = AvailablePortFinder.getNextAvailable(); +        acceptor.bind( new InetSocketAddress(nextAvailable));          System .out.println( "Server running ..." );            final NioSocketConnector connector = new NioSocketConnector(); @@ -74,12 +75,12 @@          connector.setConnectTimeoutMillis(30 * 1000L);            connector.setHandler( new ClientHandler()); -        connector.getFilterChain().addLast( "logger" , new LoggingFilter()); +      //  connector.getFilterChain().addLast( "logger" , new LoggingFilter());          connector.getFilterChain().addLast( "codec" ,                  new ProtocolCodecFilter( new TextLineCodecFactory(Charset.forName( "UTF-8" ))));            // Start communication. -        ConnectFuture cf = connector.connect( new InetSocketAddress( "localhost" , 9123)); +        ConnectFuture cf = connector.connect( new InetSocketAddress( "localhost" , nextAvailable));          cf.awaitUninterruptibly();            IoSession session = cf.getSession(); @@ -103,7 +104,9 @@              public void operationComplete(IoFuture future ) {                  System .out.println( "managed session count=" + connector.getManagedSessionCount());                  System .out.println( "Disposing connector ..." ); -                connector.dispose( true ); +                // the doc states that the following should not be called with parameter TRUE from an IoFutureListener?! +                // on the other hand, using FALSE does not work either +                connector.dispose( false );                      System .out.println( "Disposing connector ... *finished*" );                } @@ -114,11 +117,11 @@            List< String > threadsAfter = getThreadNames();   -        System .out.println( "threadsBefore = " + threadsBefore); -        System .out.println( "threadsAfter  = " + threadsAfter); +       // System .out.println( "threadsBefore = " + threadsBefore); +       // System .out.println( "threadsAfter  = " + threadsAfter);            // Assert.assertEquals(threadsBefore, threadsAfter); - +    }      }        public static class ClientHandler extends IoHandlerAdapter {    
            chrjohn Christoph John added a comment -

            Hi Jonathan,

            many thanks for the quick turnaround. Much appreciated!

            I have tested your changes and it is looking very good so far. I have only a few more questions:
            In the AbstractIoServiceTest calling dispose(true) not from IoFutureListener works as expected (i.e. all threads are cleaned up). Calling dispose(true) from an IoFutureListener leads to more and more threads being created (as you noticed). The question is if there is some way to prevent programmatically that dispose(true) is called from an IoFutureListener? The way it is now might lead to subtle problems (although in very unlikely scenarios). And from my own experience people tend to not read the JavaDocs.

            Is this problem (that you fixed) more likely to happen when a session gets closed unexpectedly? Because in our (QuickFIX/J) test suite the problem often seemed to manifest itself right after there was a WriteToClosedSessionException.

            Is this issue going to be fixed in MINA 2.0.17? Is there any ETA for MINA 2.0.17?
            (There also is DIRMINA-1057 which fixes another problem in QuickFIX/J)

            Many thanks again for your help on this.

            Cheers,
            Chris.

            chrjohn Christoph John added a comment - Hi Jonathan, many thanks for the quick turnaround. Much appreciated! I have tested your changes and it is looking very good so far. I have only a few more questions: In the AbstractIoServiceTest calling dispose(true) not from IoFutureListener works as expected (i.e. all threads are cleaned up). Calling dispose(true) from an IoFutureListener leads to more and more threads being created (as you noticed). The question is if there is some way to prevent programmatically that dispose(true) is called from an IoFutureListener? The way it is now might lead to subtle problems (although in very unlikely scenarios). And from my own experience people tend to not read the JavaDocs. Is this problem (that you fixed) more likely to happen when a session gets closed unexpectedly? Because in our (QuickFIX/J) test suite the problem often seemed to manifest itself right after there was a WriteToClosedSessionException . Is this issue going to be fixed in MINA 2.0.17? Is there any ETA for MINA 2.0.17? (There also is DIRMINA-1057 which fixes another problem in QuickFIX/J) Many thanks again for your help on this. Cheers, Chris.

            The proposed patch makes a lot of sense. There is no reason for managing removed sessions when the service is being disposed anyway.

            elecharny Emmanuel LĂ©charny added a comment - The proposed patch makes a lot of sense. There is no reason for managing removed sessions when the service is being disposed anyway.

            Christoph,

            There could be a way to detect the potential deadlock when calling dispose(true) but it would be a violation of the dispose(true) contract to change the behavior to avoid the deadlock.  I think there is some other deadlock detection that Mina detects and throws an exception.  I'll have to look into that one.  The only thing I would feel comfortable doing would be throwing an exception indicating the requested action will cause deadlock.

            johnnyv Jonathan Valliere added a comment - Christoph, There could be a way to detect the potential deadlock when calling dispose(true) but it would be a violation of the dispose(true) contract to change the behavior to avoid the deadlock.  I think there is some other deadlock detection that Mina detects and throws an exception.  I'll have to look into that one.  The only thing I would feel comfortable doing would be throwing an exception indicating the requested action will cause deadlock.

            Why Patch 2.0 doesn't fully work

            Seems like Patch 2.0 had a problem where scheduleRemove(Session) would be called concurrently and have it added to the removingSessions queue twice despite the check for contains(Session) because both threads could check for contains(Session) and return a false-negative which caused the Session to be added twice.  Queue lacks atomic operations like putIfAbsent which makes it harder to solve the problem. 

            Option 1

            One option would be to synchronize the entire code-block; however, I felt that it wasn't the safest option available because scheduleRemove(Session) could still be called by an external actor after the Processor has executed removeSessions() but before the for(...) loop cycles causing the Session to be removed twice again even if selector.keys() was used as a reference point.  The sequential operations would not be concurrent relative to each other unless synchronize blocks were added all over the place.

            Option 2

            The safest option to avoid this is to create a new queue which contains the active sessions only.  The new queue serves as an atomic reference of whether a Session was already removed or not.

            Option 2, Liability

            The new Queue could become a memory leak if Sessions are added or removed in some way without calling addNow(Session) or removeNow(Session).

            Patch 3.0 (Option 2) - Passes Unit Tests

            diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            index 79885fa..27e7d76 100644
            --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            @@ -23,6 +23,7 @@
             import java.net.PortUnreachableException;
             import java.nio.channels.ClosedSelectorException;
             import java.util.ArrayList;
            +import java.util.Collections;
             import java.util.Iterator;
             import java.util.List;
             import java.util.Queue;
            @@ -86,6 +87,9 @@
             
                 /** A Session queue containing the newly created sessions */
                 private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
            +    
            +    /** A queue used to store all active sessions */
            +    private final Queue<S> activeSessions = new ConcurrentLinkedQueue<>();
             
                 /** A queue used to store the sessions to be removed */
                 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
            @@ -239,7 +243,9 @@
                  *
                  * @return {@link Iterator} of {@link IoSession}
                  */
            -    protected abstract Iterator<S> allSessions();
            +    protected Iterator<S> allSessions() {
            +        return Collections.unmodifiableCollection(activeSessions).iterator();
            +    }
             
                 /**
                  * Get an {@link Iterator} for the list of {@link IoSession} found selected
            @@ -411,7 +417,7 @@
                 }
             
                 private void scheduleRemove(S session) {
            -        if (!removingSessions.contains(session)) {
            +        if (!removingSessions.contains(session) && activeSessions.contains(session)) {
                         removingSessions.add(session);
                     }
                 }
            @@ -659,9 +665,20 @@
                                 long currentTime = System.currentTimeMillis();
                                 flush(currentTime);
             
            +                    // Disconnect all sessions immediately if disposal has been
            +                    // requested so that we exit this loop eventually.
            +                    if (isDisposing()) {
            +                        for (Iterator<S> i = allSessions(); i.hasNext();) {
            +                            IoSession session = i.next();
            +                            scheduleRemove((S) session);
            +                        }
            +                    }
            +                    
                                 // And manage removed sessions
                                 nSessions -= removeSessions();
            -
            +                    
            +                    assert nSessions > -1 : "Internal Session Count is Negative";
            +                    
                                 // Last, not least, send Idle events to the idle sessions
                                 notifyIdleSessions(currentTime);
             
            @@ -685,26 +702,6 @@
                                     }
             
                                     assert processorRef.get() == this;
            -                    }
            -
            -                    // Disconnect all sessions immediately if disposal has been
            -                    // requested so that we exit this loop eventually.
            -                    if (isDisposing()) {
            -                        boolean hasKeys = false;
            -
            -                        for (Iterator<S> i = allSessions(); i.hasNext();) {
            -                            IoSession session = i.next();
            -
            -                            scheduleRemove((S) session);
            -
            -                            if (session.isActive()) {
            -                                hasKeys = true;
            -                            }
            -                        }
            -
            -                        if (hasKeys) {
            -                            wakeup();
            -                        }
                                 }
                             } catch (ClosedSelectorException cse) {
                                 // If the selector has been closed, we can exit the loop
            @@ -819,30 +816,41 @@
                     private boolean addNow(S session) {
                         boolean registered = false;
             
            -            try {
            -                init(session);
            -                registered = true;
            +        try {
            +        if (activeSessions.contains(session)) {
            +            return true;
            +        }
             
            -                // Build the filter chain of this session.
            -                IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
            -                chainBuilder.buildFilterChain(session.getFilterChain());
            +        if (activeSessions.add(session)) {
            +            init(session);
            +            registered = true;
             
            -                // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
            -                // in AbstractIoFilterChain.fireSessionOpened().
            -                // Propagate the SESSION_CREATED event up to the chain
            -                IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
            -                listeners.fireSessionCreated(session);
            -            } catch (Exception e) {
            -                ExceptionMonitor.getInstance().exceptionCaught(e);
            +            // Build the filter chain of this session.
            +            IoFilterChainBuilder chainBuilder = session.getService()
            +                .getFilterChainBuilder();
            +            chainBuilder.buildFilterChain(session.getFilterChain());
             
            -                try {
            -                    destroy(session);
            -                } catch (Exception e1) {
            -                    ExceptionMonitor.getInstance().exceptionCaught(e1);
            -                } finally {
            -                    registered = false;
            -                }
            -            }
            +            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside
            +            // here
            +            // in AbstractIoFilterChain.fireSessionOpened().
            +            // Propagate the SESSION_CREATED event up to the chain
            +            IoServiceListenerSupport listeners = ((AbstractIoService) session
            +                .getService()).getListeners();
            +            listeners.fireSessionCreated(session);
            +        }
            +        } catch (Exception e) {
            +        ExceptionMonitor.getInstance().exceptionCaught(e);
            +
            +        if (activeSessions.remove(session)) {
            +            try {
            +            destroy(session);
            +            } catch (Exception e1) {
            +            ExceptionMonitor.getInstance().exceptionCaught(e1);
            +            } finally {
            +            registered = false;
            +            }
            +        }
            +        }
             
                         return registered;
                     }
            @@ -850,40 +858,36 @@
                     private int removeSessions() {
                         int removedSessions = 0;
             
            -            for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
            -                SessionState state = getState(session);
            +        for (S session = removingSessions.poll(); session != null; session = removingSessions .poll()) {
            +        SessionState state = getState(session);
             
            -                // Now deal with the removal accordingly to the session's state
            -                switch (state) {
            -                case OPENED:
            -                    // Try to remove this session
            -                    if (removeNow(session)) {
            -                        removedSessions++;
            -                    }
            +        // Now deal with the removal accordingly to the session's
            +        // state
            +        switch (state) {
            +        case OPENED:
            +        case CLOSING:
            +            // Try to remove this session
            +            if (removeNow(session)) {
            +            removedSessions++;
            +            }
             
            -                    break;
            +            break;
             
            -                case CLOSING:
            -                    // Skip if channel is already closed
            -                    // In any case, remove the session from the queue
            -                    removedSessions++;
            -                    break;
            +        case OPENING:
            +            // Remove session from the newSessions queue and
            +            // remove it
            +            newSessions.remove(session);
             
            -                case OPENING:
            -                    // Remove session from the newSessions queue and
            -                    // remove it
            -                    newSessions.remove(session);
            +            if (removeNow(session)) {
            +            removedSessions++;
            +            }
             
            -                    if (removeNow(session)) {
            -                        removedSessions++;
            -                    }
            +            break;
             
            -                    break;
            -
            -                default:
            -                    throw new IllegalStateException(String.valueOf(state));
            -                }
            -            }
            +        default:
            +            throw new IllegalStateException(String.valueOf(state));
            +        }
            +        }
             
                         return removedSessions;
                     }
            @@ -1145,27 +1149,32 @@
                     }
             
                     private boolean removeNow(S session) {
            -            clearWriteRequestQueue(session);
            +        if (activeSessions.remove(session)) {
            +        clearWriteRequestQueue(session);
             
            -            try {
            -                destroy(session);
            -                return true;
            -            } catch (Exception e) {
            -                IoFilterChain filterChain = session.getFilterChain();
            -                filterChain.fireExceptionCaught(e);
            -            } finally {
            -                try {
            -                    clearWriteRequestQueue(session);
            -                    ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
            -                } catch (Exception e) {
            -                    // The session was either destroyed or not at this point.
            -                    // We do not want any exception thrown from this "cleanup" code
            -                    // to change
            -                    // the return value by bubbling up.
            -                    IoFilterChain filterChain = session.getFilterChain();
            -                    filterChain.fireExceptionCaught(e);
            -                }
            -            }
            +        try {
            +            destroy(session);
            +            return true;
            +        } catch (Exception e) {
            +            IoFilterChain filterChain = session.getFilterChain();
            +            filterChain.fireExceptionCaught(e);
            +        } finally {
            +            try {
            +            clearWriteRequestQueue(session);
            +            ((AbstractIoService) session.getService())
            +                .getListeners().fireSessionDestroyed(session);
            +            } catch (Exception e) {
            +            // The session was either destroyed or not at this
            +            // point.
            +            // We do not want any exception thrown from this
            +            // "cleanup" code
            +            // to change
            +            // the return value by bubbling up.
            +            IoFilterChain filterChain = session.getFilterChain();
            +            filterChain.fireExceptionCaught(e);
            +            }
            +        }
            +        }
             
                         return false;
                     }
            diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
            index 3b0fa40..fc60124 100644
            --- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
            +++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
            @@ -149,15 +149,20 @@
                     }
                 }
             
            +//    @Override
            +//    protected Iterator<NioSession> allSessions() {
            +//        selectorLock.readLock().lock();
            +//        
            +//        try {
            +//            return new IoSessionIterator(selector.keys());
            +//        } finally {
            +//            selectorLock.readLock().unlock();
            +//        }
            +//    }
            +    
                 @Override
                 protected Iterator<NioSession> allSessions() {
            -        selectorLock.readLock().lock();
            -        
            -        try {
            -            return new IoSessionIterator(selector.keys());
            -        } finally {
            -            selectorLock.readLock().unlock();
            -        }
            +        return super.allSessions();
                 }
             
                 @SuppressWarnings("synthetic-access")
            @@ -182,14 +187,13 @@
                 @Override
                 protected void destroy(NioSession session) throws Exception {
                     ByteChannel ch = session.getChannel();
            -        
                     SelectionKey key = session.getSelectionKey();
                     
                     if (key != null) {
                         key.cancel();
                     }
                     
            -        if ( ch.isOpen() ) {
            +        if (ch.isOpen() ) {
                         ch.close();
                     }
                 }
            
            johnnyv Jonathan Valliere added a comment - Why Patch 2.0 doesn't fully work Seems like  Patch 2.0 had a problem where scheduleRemove(Session) would be called concurrently and have it added to the removingSessions queue twice despite the check for contains(Session) because both threads could check for contains(Session) and return a false-negative which caused the Session to be added twice.  Queue lacks atomic operations like putIfAbsent which makes it harder to solve the problem.  Option 1 One option would be to synchronize the entire code-block; however, I felt that it wasn't the safest option available because scheduleRemove(Session) could still be called by an external actor after the Processor has executed removeSessions() but before the for(...) loop cycles causing the Session to be removed twice again even if selector.keys() was used as a reference point.  The sequential operations would not be concurrent relative to each other unless  synchronize blocks were added all over the place. Option 2 The safest option to avoid this is to create a new  queue which contains the active sessions only.  The new queue serves as an atomic reference of whether a  Session was already removed or not. Option 2, Liability The new Queue could become a memory leak if Sessions are added or removed in some way without calling addNow(Session) or removeNow(Session) . Patch 3.0 (Option 2) - Passes Unit Tests diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java index 79885fa..27e7d76 100644 --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java @@ -23,6 +23,7 @@   import java.net.PortUnreachableException;   import java.nio.channels.ClosedSelectorException;   import java.util.ArrayList; + import java.util.Collections;   import java.util.Iterator;   import java.util.List;   import java.util.Queue; @@ -86,6 +87,9 @@        /** A Session queue containing the newly created sessions */      private final Queue<S> newSessions = new ConcurrentLinkedQueue<>(); +     +    /** A queue used to store all active sessions */ +    private final Queue<S> activeSessions = new ConcurrentLinkedQueue<>();        /** A queue used to store the sessions to be removed */      private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>(); @@ -239,7 +243,9 @@       *       * @ return {@link Iterator} of {@link IoSession}       */ -    protected abstract Iterator<S> allSessions(); +    protected Iterator<S> allSessions() { +         return Collections.unmodifiableCollection(activeSessions).iterator(); +    }        /**       * Get an {@link Iterator} for the list of {@link IoSession} found selected @@ -411,7 +417,7 @@      }        private void scheduleRemove(S session) { -        if (!removingSessions.contains(session)) { +        if (!removingSessions.contains(session) && activeSessions.contains(session)) {              removingSessions.add(session);          }      } @@ -659,9 +665,20 @@                      long currentTime = System .currentTimeMillis();                      flush(currentTime);   +                    // Disconnect all sessions immediately if disposal has been +                    // requested so that we exit this loop eventually. +                    if (isDisposing()) { +                        for (Iterator<S> i = allSessions(); i.hasNext();) { +                            IoSession session = i.next(); +                            scheduleRemove((S) session); +                        } +                    } +                                          // And manage removed sessions                      nSessions -= removeSessions(); - +                     +                    assert nSessions > -1 : "Internal Session Count is Negative" ; +                                          // Last, not least, send Idle events to the idle sessions                      notifyIdleSessions(currentTime);   @@ -685,26 +702,6 @@                          }                            assert processorRef.get() == this ; -                    } - -                    // Disconnect all sessions immediately if disposal has been -                    // requested so that we exit this loop eventually. -                    if (isDisposing()) { -                        boolean hasKeys = false ; - -                        for (Iterator<S> i = allSessions(); i.hasNext();) { -                            IoSession session = i.next(); - -                            scheduleRemove((S) session); - -                            if (session.isActive()) { -                                hasKeys = true ; -                            } -                        } - -                        if (hasKeys) { -                            wakeup(); -                        }                      }                  } catch (ClosedSelectorException cse) {                      // If the selector has been closed, we can exit the loop @@ -819,30 +816,41 @@          private boolean addNow(S session) {              boolean registered = false ;   -            try { -                init(session); -                registered = true ; +        try { +         if (activeSessions.contains(session)) { +            return true ; +        }   -                // Build the filter chain of this session. -                IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder(); -                chainBuilder.buildFilterChain(session.getFilterChain()); +         if (activeSessions.add(session)) { +            init(session); +            registered = true ;   -                // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here -                // in AbstractIoFilterChain.fireSessionOpened(). -                // Propagate the SESSION_CREATED event up to the chain -                IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners(); -                listeners.fireSessionCreated(session); -            } catch (Exception e) { -                ExceptionMonitor.getInstance().exceptionCaught(e); +            // Build the filter chain of this session. +            IoFilterChainBuilder chainBuilder = session.getService() +                .getFilterChainBuilder(); +            chainBuilder.buildFilterChain(session.getFilterChain());   -                try { -                    destroy(session); -                } catch (Exception e1) { -                    ExceptionMonitor.getInstance().exceptionCaught(e1); -                } finally { -                    registered = false ; -                } -            } +            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside +            // here +            // in AbstractIoFilterChain.fireSessionOpened(). +            // Propagate the SESSION_CREATED event up to the chain +            IoServiceListenerSupport listeners = ((AbstractIoService) session +                .getService()).getListeners(); +            listeners.fireSessionCreated(session); +        } +        } catch (Exception e) { +        ExceptionMonitor.getInstance().exceptionCaught(e); + +         if (activeSessions.remove(session)) { +            try { +            destroy(session); +            } catch (Exception e1) { +            ExceptionMonitor.getInstance().exceptionCaught(e1); +            } finally { +            registered = false ; +            } +        } +        }                return registered;          } @@ -850,40 +858,36 @@          private int removeSessions() {              int removedSessions = 0;   -            for (S session = removingSessions.poll(); session != null ; session = removingSessions.poll()) { -                SessionState state = getState(session); +        for (S session = removingSessions.poll(); session != null ; session = removingSessions .poll()) { +        SessionState state = getState(session);   -                // Now deal with the removal accordingly to the session's state -                switch (state) { -                case OPENED: -                    // Try to remove this session -                    if (removeNow(session)) { -                        removedSessions++; -                    } +         // Now deal with the removal accordingly to the session's +         // state +         switch (state) { +         case OPENED: +         case CLOSING: +            // Try to remove this session +            if (removeNow(session)) { +            removedSessions++; +            }   -                    break ; +            break ;   -                case CLOSING: -                    // Skip if channel is already closed -                    // In any case , remove the session from the queue -                    removedSessions++; -                    break ; +         case OPENING: +            // Remove session from the newSessions queue and +            // remove it +            newSessions.remove(session);   -                case OPENING: -                    // Remove session from the newSessions queue and -                    // remove it -                    newSessions.remove(session); +            if (removeNow(session)) { +            removedSessions++; +            }   -                    if (removeNow(session)) { -                        removedSessions++; -                    } +            break ;   -                    break ; - -                default : -                    throw new IllegalStateException( String .valueOf(state)); -                } -            } +         default : +            throw new IllegalStateException( String .valueOf(state)); +        } +        }                return removedSessions;          } @@ -1145,27 +1149,32 @@          }            private boolean removeNow(S session) { -            clearWriteRequestQueue(session); +        if (activeSessions.remove(session)) { +        clearWriteRequestQueue(session);   -            try { -                destroy(session); -                return true ; -            } catch (Exception e) { -                IoFilterChain filterChain = session.getFilterChain(); -                filterChain.fireExceptionCaught(e); -            } finally { -                try { -                    clearWriteRequestQueue(session); -                    ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session); -                } catch (Exception e) { -                    // The session was either destroyed or not at this point. -                    // We do not want any exception thrown from this "cleanup" code -                    // to change -                    // the return value by bubbling up. -                    IoFilterChain filterChain = session.getFilterChain(); -                    filterChain.fireExceptionCaught(e); -                } -            } +         try { +            destroy(session); +            return true ; +        } catch (Exception e) { +            IoFilterChain filterChain = session.getFilterChain(); +            filterChain.fireExceptionCaught(e); +        } finally { +            try { +            clearWriteRequestQueue(session); +            ((AbstractIoService) session.getService()) +                .getListeners().fireSessionDestroyed(session); +            } catch (Exception e) { +             // The session was either destroyed or not at this +             // point. +             // We do not want any exception thrown from this +             // "cleanup" code +             // to change +             // the return value by bubbling up. +            IoFilterChain filterChain = session.getFilterChain(); +            filterChain.fireExceptionCaught(e); +            } +        } +        }                return false ;          } diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java index 3b0fa40..fc60124 100644 --- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java @@ -149,15 +149,20 @@          }      }   + //    @Override + //    protected Iterator<NioSession> allSessions() { + //        selectorLock.readLock().lock(); + //         + //        try { + //            return new IoSessionIterator(selector.keys()); + //        } finally { + //            selectorLock.readLock().unlock(); + //        } + //    } +          @Override      protected Iterator<NioSession> allSessions() { -        selectorLock.readLock().lock(); -         -        try { -            return new IoSessionIterator(selector.keys()); -        } finally { -            selectorLock.readLock().unlock(); -        } +         return super .allSessions();      }        @SuppressWarnings( "synthetic-access" ) @@ -182,14 +187,13 @@      @Override      protected void destroy(NioSession session) throws Exception {          ByteChannel ch = session.getChannel(); -                  SelectionKey key = session.getSelectionKey();                    if (key != null ) {              key.cancel();          }           -        if ( ch.isOpen() ) { +        if (ch.isOpen() ) {              ch.close();          }      }

            A question : if the pb is the scheduleRemove() method being called twice, leading to the session being added twice from the queue, why don't we protect the queue by adding a lock around it, making the contains and remove calls tied ? Something like :

                private Lock sessionQueueLock = new ReentrantLock();
            ...
                private void scheduleRemove(S session) {
                    lock.lock();
            
                    try {
                        if (!removingSessions.contains(session)) {
                            removingSessions.add(session);
                    } finally {
                        lock.unlock();
                    }
                }
            

            This way, you can't add a session to the queue if it's already contained i the queue.

            WDYT ?

            elecharny Emmanuel LĂ©charny added a comment - A question : if the pb is the scheduleRemove() method being called twice, leading to the session being added twice from the queue, why don't we protect the queue by adding a lock around it, making the contains and remove calls tied ? Something like : private Lock sessionQueueLock = new ReentrantLock(); ... private void scheduleRemove(S session) { lock.lock(); try { if (!removingSessions.contains(session)) { removingSessions.add(session); } finally { lock.unlock(); } } This way, you can't add a session to the queue if it's already contained i the queue. WDYT ?

            The problem is no atomic method of knowing of an Session has already been removed or not.  Its more than just "duplicates" inside of the queue because it could be added back to the queue after it was removed.  The tests are showing that it is very possible to add to the remove queue, remove, then another thread add it back to the remove queue.

            I covered that as an option under the heading Option 1.  The problem cannot be fixed using locks unless both the scheduleRemove() and everything after select() inside of processor.run() to be inside of the lock.  It would require that scheduleRemove() only happen while the processor is performing a select() in order to guarantee order.  Even with two large critical sections, there is the nasty problem of being able to use selector.keys{{()}} to verify if the session is actually registered with that processor.  To correctly fix it with locks might require extra selectNow() calls in order to update selector.keys().  Its just ugly.

             

            The other alternative might be to completely gut the nSessions mechanic and just use selector.keys().size() to determine when to dispose of the processor.

            johnnyv Jonathan Valliere added a comment - The problem is no atomic method of knowing of an Session  has already been removed or not.  Its more than just "duplicates" inside of the queue because it could be added back to the queue after it was removed.  The tests are showing that it is very possible to add to the remove queue, remove, then another thread add it back to the remove queue. I covered that as an option under the heading Option 1 .  The problem cannot be fixed using locks unless both the scheduleRemove()  and everything after select() inside of processor.run() to be inside of the lock.  It would require that scheduleRemove() only happen while the processor is performing a select() in order to guarantee order.  Even with two large critical sections, there is the nasty problem of being able to use selector.keys{{()}} to verify if the session is actually registered with that processor .  To correctly fix it with locks might require extra selectNow() calls in order to update selector.keys() .  Its just ugly.   The other alternative might be to completely gut the nSessions mechanic and just use selector.keys().size() to determine when to dispose of the processor.

            Right. One of the problem is that the session state is always computed, it's never hold by the session, otherwise we would have been able to check if the session is currently being removing, forbidding another removal to be done... Option 2 is trying to mimic that.

            Another possibility might be to rethink the way the session state is handled :

            • we add a new state (REMOVING) in the SessionState enum
            • We add a field in the NioSession class which holds the session's state
            • We set/check this state when we call getState or when we call scheduleRemove

            It may be less likely to create a memory leak.

            At this point, it's just an idea, I don't foresee all the pros and cons of this approach, I need to sleep over it (it's 5AM atm...)

            elecharny Emmanuel LĂ©charny added a comment - Right. One of the problem is that the session state is always computed, it's never hold by the session, otherwise we would have been able to check if the session is currently being removing, forbidding another removal to be done... Option 2 is trying to mimic that. Another possibility might be to rethink the way the session state is handled : we add a new state (REMOVING) in the SessionState enum We add a field in the NioSession class which holds the session's state We set/check this state when we call getState or when we call scheduleRemove It may be less likely to create a memory leak. At this point, it's just an idea, I don't foresee all the pros and cons of this approach, I need to sleep over it (it's 5AM atm...)

            In the previous post, I mentioned the possibility of totally removing the nSession state.

            Option 3
            Completely removes the nSession state because it is corruptible.  Relies entirely on selector.keys().size() for the state.  This is actually the most simple of all the alternatives as it requires no new atomic mechanisms.  Only drawback is that it might take longer to dispose() the IoProcessor due the extra select() cycle that must occur to update the session count.

            diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            index 79885fa..c807bf1 100644
            --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
            @@ -240,6 +240,13 @@
                  * @return {@link Iterator} of {@link IoSession}
                  */
                 protected abstract Iterator<S> allSessions();
            +    
            +    /**
            +     * Get the number of {@Link IoSession} polled by this {@Link IoProcessor}
            +     *
            +     * @return the number of sessions attached to this {@Link IoProcessor}
            +     */
            +    protected abstract int allSessionsCount();
             
                 /**
                  * Get an {@link Iterator} for the list of {@link IoSession} found selected
            @@ -596,7 +603,6 @@
                     public void run() {
                         assert processorRef.get() == this;
             
            -            int nSessions = 0;
                         lastIdleCheckTime = System.currentTimeMillis();
                         int nbTries = 10;
             
            @@ -641,9 +647,31 @@
                                 } else {
                                     nbTries = 10;
                                 }
            -
            +                    
                                 // Manage newly created session first
            -                    nSessions += handleNewSessions();
            +                    if(handleNewSessions() == 0) {
            +                        // Get a chance to exit the infinite loop if there are no
            +                        // more sessions on this Processor
            +                        if (allSessionsCount() == 0) {
            +                            processorRef.set(null);
            +
            +                            if (newSessions.isEmpty() && isSelectorEmpty()) {
            +                                // newSessions.add() precedes startupProcessor
            +                                assert processorRef.get() != this;
            +                                break;
            +                            }
            +
            +                            assert processorRef.get() != this;
            +
            +                            if (!processorRef.compareAndSet(null, this)) {
            +                                // startupProcessor won race, so must exit processor
            +                                assert processorRef.get() != this;
            +                                break;
            +                            }
            +
            +                            assert processorRef.get() == this;
            +                        }
            +                    }
             
                                 updateTrafficMask();
             
            @@ -654,39 +682,17 @@
                                     // the MDCFilter test...
                                     process();
                                 }
            -
            +                    
                                 // Write the pending requests
                                 long currentTime = System.currentTimeMillis();
                                 flush(currentTime);
            -
            -                    // And manage removed sessions
            -                    nSessions -= removeSessions();
            -
            +                    
                                 // Last, not least, send Idle events to the idle sessions
                                 notifyIdleSessions(currentTime);
            -
            -                    // Get a chance to exit the infinite loop if there are no
            -                    // more sessions on this Processor
            -                    if (nSessions == 0) {
            -                        processorRef.set(null);
            -
            -                        if (newSessions.isEmpty() && isSelectorEmpty()) {
            -                            // newSessions.add() precedes startupProcessor
            -                            assert processorRef.get() != this;
            -                            break;
            -                        }
            -
            -                        assert processorRef.get() != this;
            -
            -                        if (!processorRef.compareAndSet(null, this)) {
            -                            // startupProcessor won race, so must exit processor
            -                            assert processorRef.get() != this;
            -                            break;
            -                        }
            -
            -                        assert processorRef.get() == this;
            -                    }
            -
            +                    
            +                    // And manage removed sessions
            +                    removeSessions();
            +                    
                                 // Disconnect all sessions immediately if disposal has been
                                 // requested so that we exit this loop eventually.
                                 if (isDisposing()) {
            @@ -702,9 +708,7 @@
                                         }
                                     }
             
            -                        if (hasKeys) {
            -                            wakeup();
            -                        }
            +                        wakeup();
                                 }
                             } catch (ClosedSelectorException cse) {
                                 // If the selector has been closed, we can exit the loop
            diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
            index 3b0fa40..9948d7a 100644
            --- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
            +++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
            @@ -159,6 +159,12 @@
                         selectorLock.readLock().unlock();
                     }
                 }
            +    
            +    @Override
            +    protected int allSessionsCount()
            +    {
            +        return selector.keys().size();
            +    }
             
                 @SuppressWarnings("synthetic-access")
                 @Override
            diff --git a/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
            index 379f55b..f289aaf 100644
            --- a/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
            +++ b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java
            @@ -61,6 +61,11 @@
                         protected Iterator<NioSession> allSessions() {
                             return proc.allSessions();
                         }
            +            
            +            @Override
            +            protected int allSessionsCount() {
            +                return proc.allSessionsCount();
            +            }
             
                         @Override
                         protected void destroy(NioSession session) throws Exception {
            
            johnnyv Jonathan Valliere added a comment - In the previous post, I mentioned the possibility of totally removing the nSession state. Option 3 Completely removes the nSession state because it is corruptible.  Relies entirely on selector.keys().size() for the state.  This is actually the most simple of all the alternatives as it requires no new atomic mechanisms.  Only drawback is that it might take longer to dispose() the IoProcessor due the extra select() cycle that must occur to update the session count. diff --git a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java index 79885fa..c807bf1 100644 --- a/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java @@ -240,6 +240,13 @@       * @ return {@link Iterator} of {@link IoSession}       */      protected abstract Iterator<S> allSessions(); +     +    /** +     * Get the number of {@Link IoSession} polled by this {@Link IoProcessor} +     * +     * @ return the number of sessions attached to this {@Link IoProcessor} +     */ +    protected abstract int allSessionsCount();        /**       * Get an {@link Iterator} for the list of {@link IoSession} found selected @@ -596,7 +603,6 @@          public void run() {              assert processorRef.get() == this ;   -            int nSessions = 0;              lastIdleCheckTime = System .currentTimeMillis();              int nbTries = 10;   @@ -641,9 +647,31 @@                      } else {                          nbTries = 10;                      } - +                                          // Manage newly created session first -                    nSessions += handleNewSessions(); +                    if (handleNewSessions() == 0) { +                         // Get a chance to exit the infinite loop if there are no +                        // more sessions on this Processor +                        if (allSessionsCount() == 0) { +                            processorRef.set( null ); + +                            if (newSessions.isEmpty() && isSelectorEmpty()) { +                                // newSessions.add() precedes startupProcessor +                                assert processorRef.get() != this ; +                                break ; +                            } + +                            assert processorRef.get() != this ; + +                            if (!processorRef.compareAndSet( null , this )) { +                                // startupProcessor won race, so must exit processor +                                assert processorRef.get() != this ; +                                break ; +                            } + +                            assert processorRef.get() == this ; +                        } +                    }                        updateTrafficMask();   @@ -654,39 +682,17 @@                          // the MDCFilter test...                          process();                      } - +                                          // Write the pending requests                      long currentTime = System .currentTimeMillis();                      flush(currentTime); - -                    // And manage removed sessions -                    nSessions -= removeSessions(); - +                                          // Last, not least, send Idle events to the idle sessions                      notifyIdleSessions(currentTime); - -                    // Get a chance to exit the infinite loop if there are no -                    // more sessions on this Processor -                    if (nSessions == 0) { -                        processorRef.set( null ); - -                        if (newSessions.isEmpty() && isSelectorEmpty()) { -                            // newSessions.add() precedes startupProcessor -                            assert processorRef.get() != this ; -                            break ; -                        } - -                        assert processorRef.get() != this ; - -                        if (!processorRef.compareAndSet( null , this )) { -                            // startupProcessor won race, so must exit processor -                            assert processorRef.get() != this ; -                            break ; -                        } - -                        assert processorRef.get() == this ; -                    } - +                     +                    // And manage removed sessions +                    removeSessions(); +                                          // Disconnect all sessions immediately if disposal has been                      // requested so that we exit this loop eventually.                      if (isDisposing()) { @@ -702,9 +708,7 @@                              }                          }   -                        if (hasKeys) { -                            wakeup(); -                        } +                        wakeup();                      }                  } catch (ClosedSelectorException cse) {                      // If the selector has been closed, we can exit the loop diff --git a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java index 3b0fa40..9948d7a 100644 --- a/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java +++ b/mina-core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java @@ -159,6 +159,12 @@              selectorLock.readLock().unlock();          }      } +     +    @Override +    protected int allSessionsCount() +    { +         return selector.keys().size(); +    }        @SuppressWarnings( "synthetic-access" )      @Override diff --git a/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java index 379f55b..f289aaf 100644 --- a/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java +++ b/mina-core/src/test/java/org/apache/mina/transport/socket/nio/PollingIoProcessorTest.java @@ -61,6 +61,11 @@              protected Iterator<NioSession> allSessions() {                  return proc.allSessions();              } +             +            @Override +            protected int allSessionsCount() { +                 return proc.allSessionsCount(); +            }                @Override              protected void destroy(NioSession session) throws Exception {
            elecharny Emmanuel LĂ©charny added a comment - - edited

            The pb is that selector.keys() is not thread safe :/ There is no guarantee that a call to size() will return the correct number of active sessions...

            elecharny Emmanuel LĂ©charny added a comment - - edited The pb is that selector.keys() is not thread safe :/ There is no guarantee that a call to size() will return the correct number of active sessions...

            When called from the IoProcessor worker thread, it is safe. All additions and removals must occur from the IoProcessor worker thread. Session are atomically queued for add and remove, all of which occurs inside of the IoProcessor worker thread. If you were to call selector.keys().size() anywhere else, it would not be safe.

            johnnyv Jonathan Valliere added a comment - When called from the IoProcessor worker thread, it is safe. All additions and removals must occur from the IoProcessor worker thread. Session are atomically queued for add and remove, all of which occurs inside of the IoProcessor worker thread. If you were to call selector.keys().size() anywhere else, it would not be safe.

            Right, right. I kind of like this option 3. Regarding the extra select() call when disposing the selector, not sure we need to do an extra select loop : the count should be 0 when we dispose the selector anyway... Or maybe I'm missing sometjing ?

            elecharny Emmanuel LĂ©charny added a comment - Right, right. I kind of like this option 3. Regarding the extra select() call when disposing the selector, not sure we need to do an extra select loop : the count should be 0 when we dispose the selector anyway... Or maybe I'm missing sometjing ?

            selector.close() occurs inside of the IoProcessor worker thread. And it only happens when the event loop exits (unless called directly through a non public function), therefore, the number of keys must be 0 before IoProcessor will call selector.close(). The extra select() loop is to cause the selector to update selector.keys().size(). I think its fine the way it is. If it concerns you, we can change the loop timeout to something smaller, like 250ms.

            johnnyv Jonathan Valliere added a comment - selector.close() occurs inside of the IoProcessor worker thread. And it only happens when the event loop exits (unless called directly through a non public function), therefore, the number of keys must be 0 before IoProcessor will call selector.close() . The extra select() loop is to cause the selector to update selector.keys().size() . I think its fine the way it is. If it concerns you, we can change the loop timeout to something smaller, like 250ms.

            Calling wakeup() should make the next call to select() non blocking, so the extra select() loop would not be an issue.

            Have you tested option 3 against unit tests ?

            elecharny Emmanuel LĂ©charny added a comment - Calling wakeup() should make the next call to select() non blocking, so the extra select() loop would not be an issue. Have you tested option 3 against unit tests ?

            wakeup() actually doesn't mitigate the next select() at all; looking at the code in NioProcessor#select(timeout) the wakeupCalled isn't even referenced at all. I did force an wakeup() inside of the isDisposing() block at the end of the loop to mitigate the next select() in the event I was misunderstanding it. If you were to mitigate the next select() then you would have to call selectNow() instead. It has to be called, one way or another. Would be pretty simple to add wakeupCalled then selectNow() else select().

            johnnyv Jonathan Valliere added a comment - wakeup() actually doesn't mitigate the next select() at all; looking at the code in NioProcessor#select(timeout) the wakeupCalled isn't even referenced at all. I did force an wakeup() inside of the isDisposing() block at the end of the loop to mitigate the next select() in the event I was misunderstanding it. If you were to mitigate the next select() then you would have to call selectNow() instead. It has to be called, one way or another. Would be pretty simple to add wakeupCalled then selectNow() else select() .

            chrjohn Please apply the patch for Option 3 to your workspace and run it through all of your tests. It passes the Unit Tests and the examples you provided in both DIRMINA-1076 and DIRMINA-1077.

            johnnyv Jonathan Valliere added a comment - chrjohn Please apply the patch for Option 3 to your workspace and run it through all of your tests. It passes the Unit Tests and the examples you provided in both DIRMINA-1076 and DIRMINA-1077 .
            chrjohn Christoph John added a comment -

            Hi johnnyv, thanks for the patch. First tests looking good so far. I'll let our QuickFIX/J test suite run overnight and see what happens, but looks very promising.
            Thank you and cheers,
            Chris.

            chrjohn Christoph John added a comment - Hi johnnyv , thanks for the patch. First tests looking good so far. I'll let our QuickFIX/J test suite run overnight and see what happens, but looks very promising. Thank you and cheers, Chris.

            Sounds good !

            Jonathan, let's wait for tomorrow's feedback from Christoph and if all is good, please commit the patch. I can then re-start the release process, unless there is anything else we want to fix/change in the code.

            Thanks !

            elecharny Emmanuel LĂ©charny added a comment - Sounds good ! Jonathan, let's wait for tomorrow's feedback from Christoph and if all is good, please commit the patch. I can then re-start the release process, unless there is anything else we want to fix/change in the code. Thanks !
            chrjohn Christoph John added a comment -

            Hi guys, everything looking good on my end. Thanks for the quick turnaround and your help.
            Cheers,
            Chris.

            chrjohn Christoph John added a comment - Hi guys, everything looking good on my end. Thanks for the quick turnaround and your help. Cheers, Chris.
            chrjohn Christoph John added a comment - - edited

            Just a question: does it make sense to add the tests I created for DIRMINA-1076 and DIRMINA-1077 to the project? They probably should be on @Ignore though since they run in an endless loop. OTOH when they are ignored they probably don't make sense to be included either.

            chrjohn Christoph John added a comment - - edited Just a question: does it make sense to add the tests I created for DIRMINA-1076 and DIRMINA-1077 to the project? They probably should be on @Ignore though since they run in an endless loop. OTOH when they are ignored they probably don't make sense to be included either.

            elecharny What branch should I apply the patch to when committing?
            chrjohn If you want, you create a new request with the patch to enable your tests.

            johnnyv Jonathan Valliere added a comment - elecharny What branch should I apply the patch to when committing? chrjohn If you want, you create a new request with the patch to enable your tests.

            Hi Jonathan,

            appky the patch on 2.0.16, which is the latest revision for the 2.0 branch. We will make it a 2.017 as soon as we are good to go.

            elecharny Emmanuel LĂ©charny added a comment - Hi Jonathan, appky the patch on 2.0.16, which is the latest revision for the 2.0 branch. We will make it a 2.017 as soon as we are good to go.

            The AprIoProcessor class extends the AbstractPollingIoProcessor class, thus should implement the newly added allSessionsCount() method.

            elecharny Emmanuel LĂ©charny added a comment - The AprIoProcessor class extends the AbstractPollingIoProcessor class, thus should implement the newly added allSessionsCount() method.

            Method added with commit f50b788ec7aa0eed948fa566a36aaf7824beeae0

            elecharny Emmanuel LĂ©charny added a comment - Method added with commit f50b788ec7aa0eed948fa566a36aaf7824beeae0
            githubbot ASF GitHub Bot added a comment -

            GitHub user chrjohn opened a pull request:

            https://github.com/apache/mina/pull/13

            Added tests for issues DIRMINA-1076 and DIRMINA-1077.

            You can merge this pull request into a Git repository by running:

            $ git pull https://github.com/chrjohn/mina 2.0

            Alternatively you can review and apply these changes as the patch at:

            https://github.com/apache/mina/pull/13.patch

            To close this pull request, make a commit to your master/trunk branch
            with (at least) the following in the commit message:

            This closes #13


            commit 4f9b2c609aff491702619258babd9ce51dbc1b11
            Author: chrjohn <christoph.john@...>
            Date: 2018-03-03T21:58:53Z

            Added tests for issues DIRMINA-1076 and DIRMINA-1077.


            githubbot ASF GitHub Bot added a comment - GitHub user chrjohn opened a pull request: https://github.com/apache/mina/pull/13 Added tests for issues DIRMINA-1076 and DIRMINA-1077 . You can merge this pull request into a Git repository by running: $ git pull https://github.com/chrjohn/mina 2.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/mina/pull/13.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #13 commit 4f9b2c609aff491702619258babd9ce51dbc1b11 Author: chrjohn <christoph.john@...> Date: 2018-03-03T21:58:53Z Added tests for issues DIRMINA-1076 and DIRMINA-1077 .

            People

              johnnyv Jonathan Valliere
              chrjohn Christoph John
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 10m
                  10m