Uploaded image for project: 'MINA'
  1. MINA
  2. DIRMINA-682

We need a better documentation for the ExecutorFilter [was :Writing more than one message will block until the MessageReceived as been fully proceced]

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Critical
    • Resolution: Not A Problem
    • Affects Version/s: 2.0.0-M4
    • Fix Version/s: 2.0.8
    • Component/s: None
    • Labels:
      None

      Description

      When a message generates mor ethan one response, then the written responses will be sent to the client only when the initial message has been totally processed.

      Suppose that we receive one message M, it will be handled by a IoProcessor in the process() method, go through the chain to the IoHandler.MessageReceive() method. Now, if one want to write more than one response (session.write( R )), then those responses will be enqueued until we are back to the process() method.

      The issue is due to the fact that the write is done using the IoProcessor associated to the current session, leading to a problem : we can't ask the IoProcessor instance to unqueue the written message until it is done with the current processing( it's running in one single thread).

      The consequences are painfull :

      • if one tries to write two responses, waiting for the first responses to be written, this will end with a DeadLock, as we are waiting on the processor we are holding
      • if we don't care about waiting for the write to be done, then all the responses will be enqueued and stored in memory, until the IoProcessor exit from the read processing and start processing the writes, leading to OOM Exception

      One solution would be to have to sets of IoProcessors, one for the read, and one for the writes. Or to pick a random Processor to process the writes, as soon as the processor is not the same as the one processing the reads.

      Here is a sample exhibiting the problem. Just launch it, and use 'telnet localhost 8080' in a console, type something, it should write twice the typed message, but it just generates an exception - see further - and write back the message once. Removing the wait will work, but the messages will be sent only when the read has been processed in the AbstractPollingIoProcessor.process(T session) method :

          /**
           * Deal with session ready for the read or write operations, or both.
           */
          private void process(T session) {
              // Process Reads
              if (isReadable(session) && !session.isReadSuspended()) {
                  read(session);
              }
      
              // Process writes
              if (isWritable(session) && !session.isWriteSuspended()) {
                  scheduleFlush(session);
              }
          }
      
      

      The sample code :

      package org.apache.mina.real.life;
      
      import java.net.InetSocketAddress;
      
      import org.apache.mina.core.buffer.IoBuffer;
      import org.apache.mina.core.future.WriteFuture;
      import org.apache.mina.core.service.IoHandlerAdapter;
      import org.apache.mina.core.session.IoSession;
      import org.apache.mina.filter.logging.LoggingFilter;
      import org.apache.mina.transport.socket.SocketAcceptor;
      import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
      
      /**
       * (<b>Entry point</b>) Echo server
       *
       * @author The Apache MINA Project (dev@mina.apache.org)
       */
      public class Main {
          private static class EchoProtocolHandler extends IoHandlerAdapter {
              public void messageReceived(IoSession session, Object message)
                      throws Exception {
                  System.out.println(new String(((IoBuffer)message).array()));
                  // Write the received data back to remote peer
                  WriteFuture wf = session.write(((IoBuffer) message).duplicate());
                  
                  // Here, we will get a Deadlock detection
                  wf.awaitUninterruptibly();
                  
                  // Do a second write
                  session.write(((IoBuffer) message).duplicate());
              }
          }    
          /** Choose your favorite port number. */
          private static final int PORT = 8080;
      
          public static void main(String[] args) throws Exception {
              SocketAcceptor acceptor = new NioSocketAcceptor();
              
              // Add a logging filter
              acceptor.getFilterChain().addLast( "Logger", new LoggingFilter() );
              
              // Bind
              acceptor.setHandler(new EchoProtocolHandler());
              acceptor.bind(new InetSocketAddress(PORT));
      
              System.out.println("Listening on port " + PORT);
          }
      }
      

      The exception :

      Listening on port 8080
      [20:08:17] INFO [org.apache.mina.filter.logging.LoggingFilter] - CREATED
      [20:08:17] INFO [org.apache.mina.filter.logging.LoggingFilter] - OPENED
      [20:08:19] INFO [org.apache.mina.filter.logging.LoggingFilter] - RECEIVED: HeapBuffer[pos=0 lim=6 cap=2048: 74 65 73 74 0D 0A]
      test

      [20:08:30] WARN [org.apache.mina.filter.logging.LoggingFilter] - EXCEPTION :
      java.lang.IllegalStateException: DEAD LOCK: IoFuture.await() was invoked from an I/O processor thread. Please use IoFutureListener or configure a proper thread model alternatively.
      at org.apache.mina.core.future.DefaultIoFuture.checkDeadLock(DefaultIoFuture.java:235)
      at org.apache.mina.core.future.DefaultIoFuture.await0(DefaultIoFuture.java:203)
      at org.apache.mina.core.future.DefaultIoFuture.awaitUninterruptibly(DefaultIoFuture.java:131)
      at org.apache.mina.core.future.DefaultWriteFuture.awaitUninterruptibly(DefaultWriteFuture.java:114)
      at org.apache.mina.real.life.Main$EchoProtocolHandler.messageReceived(Main.java:45)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain$TailFilter.messageReceived(DefaultIoFilterChain.java:722)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:48)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:802)
      at org.apache.mina.filter.logging.LoggingFilter.messageReceived(LoggingFilter.java:178)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:48)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:802)
      at org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:120)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:604)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:564)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:553)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$400(AbstractPollingIoProcessor.java:57)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:892)
      at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:65)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:619)
      [20:08:30] WARN [org.apache.mina.real.life.Main$EchoProtocolHandler] - EXCEPTION, please implement org.apache.mina.real.life.Main$EchoProtocolHandler.exceptionCaught() for proper handling:
      java.lang.IllegalStateException: DEAD LOCK: IoFuture.await() was invoked from an I/O processor thread. Please use IoFutureListener or configure a proper thread model alternatively.
      at org.apache.mina.core.future.DefaultIoFuture.checkDeadLock(DefaultIoFuture.java:235)
      at org.apache.mina.core.future.DefaultIoFuture.await0(DefaultIoFuture.java:203)
      at org.apache.mina.core.future.DefaultIoFuture.awaitUninterruptibly(DefaultIoFuture.java:131)
      at org.apache.mina.core.future.DefaultWriteFuture.awaitUninterruptibly(DefaultWriteFuture.java:114)
      at org.apache.mina.real.life.Main$EchoProtocolHandler.messageReceived(Main.java:45)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain$TailFilter.messageReceived(DefaultIoFilterChain.java:722)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:48)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:802)
      at org.apache.mina.filter.logging.LoggingFilter.messageReceived(LoggingFilter.java:178)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:48)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:802)
      at org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:120)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
      at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:604)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:564)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:553)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$400(AbstractPollingIoProcessor.java:57)
      at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:892)
      at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:65)
      at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
      at java.lang.Thread.run(Thread.java:619)
      [20:08:30] INFO [org.apache.mina.filter.logging.LoggingFilter] - SENT: HeapBuffer[pos=0 lim=6 cap=2048: 74 65 73 74 0D 0A]

        Issue Links

          Activity

          Hide
          elecharny Emmanuel Lecharny added a comment -

          The issue is that my initial logic was fucked up. The way to deal with the pb is to wait for the messageSent event to be received before writing the next message.

          Show
          elecharny Emmanuel Lecharny added a comment - The issue is that my initial logic was fucked up. The way to deal with the pb is to wait for the messageSent event to be received before writing the next message.
          Hide
          elecharny Emmanuel Lecharny added a comment -

          Postponed to 2.0.1

          Show
          elecharny Emmanuel Lecharny added a comment - Postponed to 2.0.1
          Hide
          vicnov Victor N added a comment -

          I agree, it would be great to document ExecutorFilter and especially OrderedThreadPool in more details!

          In the test (2) - what was your filter chain? Did not you use the same OrderedThreadPoolExecutor for both MESSAGE_RECEIVED and WRITE operations? If I understand correctly, this could bring to a deadlock because we were already running MESSAGE_RECEIVED and tried to do WRITE.

          Show
          vicnov Victor N added a comment - I agree, it would be great to document ExecutorFilter and especially OrderedThreadPool in more details! In the test (2) - what was your filter chain? Did not you use the same OrderedThreadPoolExecutor for both MESSAGE_RECEIVED and WRITE operations? If I understand correctly, this could bring to a deadlock because we were already running MESSAGE_RECEIVED and tried to do WRITE.
          Hide
          elecharny Emmanuel Lecharny added a comment -

          Ok, after having done more tests, and discussed with Maarten and Alex, I don't think it's a bug, but something I missed.

          However, I keep the JIRA open, with another status, title and urgency. It's now an improvement for better doco.

          Show
          elecharny Emmanuel Lecharny added a comment - Ok, after having done more tests, and discussed with Maarten and Alex, I don't think it's a bug, but something I missed. However, I keep the JIRA open, with another status, title and urgency. It's now an improvement for better doco.
          Hide
          elecharny Emmanuel Lecharny added a comment -

          Another info : if I don't filter the WRITE events only, it works just fine.

          Show
          elecharny Emmanuel Lecharny added a comment - Another info : if I don't filter the WRITE events only, it works just fine.
          Hide
          elecharny Emmanuel Lecharny added a comment -

          More info, after having done more tests.

          If I add an ExecutorFilter in the chain, then things are different.

          1) If I add
          // Add an executor filter
          acceptor.getFilterChain().addLast( "Executor", new ExecutorFilter() );
          everything works fine.

          2) If I add
          acceptor.getFilterChain().addLast( "Executor", new ExecutorFilter(
          new OrderedThreadPoolExecutor( 2 ),
          IoEventType.WRITE ) );
          then I get a deadlock exception

          3) if I add
          acceptor.getFilterChain().addLast( "Executor", new ExecutorFilter(
          new UnOrderedThreadPoolExecutor( 2 ),
          IoEventType.WRITE ) );
          then I also get the deadlock exception

          So there is something strange happening in the ThreadPoolExecutor...

          Show
          elecharny Emmanuel Lecharny added a comment - More info, after having done more tests. If I add an ExecutorFilter in the chain, then things are different. 1) If I add // Add an executor filter acceptor.getFilterChain().addLast( "Executor", new ExecutorFilter() ); everything works fine. 2) If I add acceptor.getFilterChain().addLast( "Executor", new ExecutorFilter( new OrderedThreadPoolExecutor( 2 ), IoEventType.WRITE ) ); then I get a deadlock exception 3) if I add acceptor.getFilterChain().addLast( "Executor", new ExecutorFilter( new UnOrderedThreadPoolExecutor( 2 ), IoEventType.WRITE ) ); then I also get the deadlock exception So there is something strange happening in the ThreadPoolExecutor...

            People

            • Assignee:
              Unassigned
              Reporter:
              elecharny Emmanuel Lecharny
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development