Uploaded image for project: 'MINA'
  1. MINA
  2. DIRMINA-1142

Missing messages when sending asynchronously

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 2.1.3, 2.1.4
    • 2.2.0
    • Core
    • None
    • Linux

    Description

      I found an issue on 2.1.x branch when a client is sending messages asynchronously.

      I create a simple client and a server with a simple ObjectSerializationCodecFactory.
      On a client, I send simple String messages (e.g. Message:1) inside a thread pool (size=2).
      Then I check:

      • On client, I check that I send all these messages and avoid duplicate.
      • On server, I check that I receive all these messages.

      If client send "1000" messages on thread pool with 2 threads, server will miss 11messages:

      12:24:59,573 INFO <main> missingMessageTest.begin with 1000 messages and 2 threads
      12:24:59,577 INFO <main> missingMessageTest.end
      12:24:59,577 INFO <main> missingMessageTest.sleep... 1000
      12:24:59,580 ERROR <NioProcessor-14> messageSent: message <Message:21> already sent
      12:24:59,581 ERROR <NioProcessor-14> messageSent: message <Message:27> already sent
      12:24:59,584 ERROR <NioProcessor-14> messageSent: message <Message:113> already sent
      12:24:59,587 ERROR <NioProcessor-14> messageSent: message <Message:164> already sent
      12:24:59,592 ERROR <NioProcessor-14> messageSent: message <Message:313> already sent
      12:24:59,596 ERROR <NioProcessor-14> messageSent: message <Message:386> already sent
      12:24:59,604 ERROR <NioProcessor-14> messageSent: message <Message:567> already sent
      12:24:59,605 ERROR <NioProcessor-14> messageSent: message <Message:615> already sent
      12:24:59,606 ERROR <NioProcessor-14> messageSent: message <Message:628> already sent
      12:24:59,611 ERROR <NioProcessor-14> messageSent: message <Message:760> already sent
      12:24:59,612 ERROR <NioProcessor-14> messageSent: message <Message:822> already sent
      12:25:00,578 INFO <main> missingMessageTest.sleep... 2000
      12:25:01,578 INFO <main> missingMessageTest.sleep... 3000
      12:25:02,579 INFO <main> missingMessageTest.sleep... 4000
      12:25:03,579 INFO <main> missingMessageTest.sleep... 5000
      12:25:04,579 INFO <main> missingMessageTest.close
      12:25:04,582 ERROR <main> missing <11> messages : [Message:760, Message:27, Message:21, Message:313, Message:567, Message:822, Message:615, Message:628, Message:386, Message:113, Message:164]
      

      I works fine with one thread (e.g. synchronous send).
      This junit works with "Apache Mina" "2.0.21", it failed with "2.1.x" branch.

      I attach my junit to reproduce this issue on your side.

       

       

      Attachments

        1. ClientServerTest.java
          6 kB
          Maxime Leur

        Issue Links

          Activity

            I want to note that AFAIK the ProtocolCodec package is not necessarily intended to be thread-safe. While it does use some concurrent structures like ConcurrentLinkedDeque, it also uses session specific encoder/decoder instances as well as ProtocolDeocderOutput.

            I could delegate the retrieval of the ProtocolEncoderOutput to the ProtocolCodecFactory. This would put the thread-safe "ness" more in the hands of the specific implementation and not the framework.

            elecharny what you do think about the suggested API change? Adding the default method to the abstract class should prevent it from breaking backwards compatibility.

            johnnyv Jonathan Valliere added a comment - I want to note that AFAIK the ProtocolCodec package is not necessarily intended to be thread-safe. While it does use some concurrent structures like ConcurrentLinkedDeque, it also uses session specific encoder/decoder instances as well as ProtocolDeocderOutput. I could delegate the retrieval of the ProtocolEncoderOutput to the ProtocolCodecFactory. This would put the thread-safe "ness" more in the hands of the specific implementation and not the framework. elecharny what you do think about the suggested API change? Adding the default method to the abstract class should prevent it from breaking backwards compatibility.

            The ProtocolCodec is not intended to be thread safe.

            Give me a bit of time to review your proposal.

            FTR, I really never liked the current implementation. The fact that the encoder/decoder is always shared with all the chain is problematic. OTOH, we assume the protocol implementer is aware that those two classes have to be thread safe (which makes it quite complicated when you have to deal with fragmentation).
            Instanciating the encoder/decoder for every new session would not necessarily cost more, not would kill performances, and would guarantee the thread safety, IMHO.

            elecharny Emmanuel LĂ©charny added a comment - The ProtocolCodec is not intended to be thread safe. Give me a bit of time to review your proposal. FTR, I really never liked the current implementation. The fact that the encoder/decoder is always shared with all the chain is problematic. OTOH, we assume the protocol implementer is aware that those two classes have to be thread safe (which makes it quite complicated when you have to deal with fragmentation). Instanciating the encoder/decoder for every new session would not necessarily cost more, not would kill performances, and would guarantee the thread safety, IMHO.

            The current encoder/decoders all establish session-local object holders in the same way that most filters do. The problem is concurrency when using single session-local objects and workflows. The TextLineFactory for example does not work concurrently because the CharsetEncoder becomes corrupted. Creating a new CharsetEncoder for every interaction with the API makes it thread-safe but creates more garbage memory. Almost need an IoSession API for creating thread-local objects from a factory that way those objects are garbage collected when the IoSession is.

            johnnyv Jonathan Valliere added a comment - The current encoder/decoders all establish session-local object holders in the same way that most filters do. The problem is concurrency when using single session-local objects and workflows. The TextLineFactory for example does not work concurrently because the CharsetEncoder becomes corrupted. Creating a new CharsetEncoder for every interaction with the API makes it thread-safe but creates more garbage memory. Almost need an IoSession API for creating thread-local objects from a factory that way those objects are garbage collected when the IoSession is.

            Clearly using TLS would avoid some of the problem we have, but objects stored in the TLS will be shared across the sessions, too, so that mean you will have to keep a map of sessions in the TLS.

            In any case, when dealing with fragmentation (ie when you have to wait for more data to process the decoding), you have to store something in the session anyway. This is why I suggest to have a decoder instance per session as it may store those data and will be GCed when the session dies.

            All in all, if the encoders/decoders are well written, ie stateless, we should be safe. That means they have to deal with the storage of partial data in the session object.

            elecharny Emmanuel LĂ©charny added a comment - Clearly using TLS would avoid some of the problem we have, but objects stored in the TLS will be shared across the sessions, too, so that mean you will have to keep a map of sessions in the TLS. In any case, when dealing with fragmentation (ie when you have to wait for more data to process the decoding), you have to store something in the session anyway. This is why I suggest to have a decoder instance per session as it may store those data and will be GCed when the session dies. All in all, if the encoders/decoders are well written, ie stateless, we should be safe. That means they have to deal with the storage of partial data in the session object.

            Fragmentation on the receive side would be an issue. Adding a thread pool before the protocol filter would create chaos for trying to serial reconstruct fragmented messages. The ProtocolCodec core API should be stateless but it is not. The use of the EncoderOutput object makes it not stateless as messages are enqueued there and flushed in aggregate after.

            I had to fully remove the WriteRequest object reuse code in order to fix the bug discussed above. Now, it will create a new EncodedWriteRequest object from every output object in order to prevent multiple threads from trying to detect the end of the session-local EncodeOutput queue. The easiest thing might be to make the EncodeOutput TLS and attached to the Session. That way the EncodeOutput can be reused as well as the WriteRequest leading to zero performance degradation between single and multi-thread support.

            johnnyv Jonathan Valliere added a comment - Fragmentation on the receive side would be an issue. Adding a thread pool before the protocol filter would create chaos for trying to serial reconstruct fragmented messages. The ProtocolCodec core API should be stateless but it is not. The use of the EncoderOutput object makes it not stateless as messages are enqueued there and flushed in aggregate after. I had to fully remove the WriteRequest object reuse code in order to fix the bug discussed above. Now, it will create a new EncodedWriteRequest object from every output object in order to prevent multiple threads from trying to detect the end of the session-local EncodeOutput queue. The easiest thing might be to make the EncodeOutput TLS and attached to the Session. That way the EncodeOutput can be reused as well as the WriteRequest leading to zero performance degradation between single and multi-thread support.
            maxime.leur Maxime Leur added a comment -

            Hi,

            Thanks for the correction.
            But I have another issue when I test my junit with the new code, the method "IoHandlerAdapter.messageSent" in client (that send messages) is no more called.

            private static class ClientHandler extends IoHandlerAdapter
            	{
            		private Set<String> messages = new HashSet<>(LOOP);
            		private AtomicInteger count = new AtomicInteger(0);
            		
            		@Override
            		public void messageSent(IoSession session, Object message) throws Exception {
                            ....
                          }
            

            Regards,
            Maxime

            maxime.leur Maxime Leur added a comment - Hi, Thanks for the correction. But I have another issue when I test my junit with the new code, the method "IoHandlerAdapter.messageSent" in client (that send messages) is no more called. private static class ClientHandler extends IoHandlerAdapter { private Set< String > messages = new HashSet<>(LOOP); private AtomicInteger count = new AtomicInteger(0); @Override public void messageSent(IoSession session, Object message) throws Exception { .... } Regards, Maxime

            ProtocolCodec needs more changes to work concurrently without breaking other dependencies

            johnnyv Jonathan Valliere added a comment - ProtocolCodec needs more changes to work concurrently without breaking other dependencies

            Okay, I'm working on some API changes to the codec for full parallel support in encoding and decoding. Check this out https://gitbox.apache.org/repos/asf?p=mina.git;a=shortlog;h=refs/heads/bugfix/DIRMINA-1142

            johnnyv Jonathan Valliere added a comment - Okay, I'm working on some API changes to the codec for full parallel support in encoding and decoding. Check this out https://gitbox.apache.org/repos/asf?p=mina.git;a=shortlog;h=refs/heads/bugfix/DIRMINA-1142

            Is there an update here?  Otherwise I'm going to push this to the 2.2.X branch

            johnnyv Jonathan Valliere added a comment - Is there an update here?  Otherwise I'm going to push this to the 2.2.X branch

            There hasn't been any activity on this issue for a while.  The changes have been pushed into https://gitbox.apache.org/repos/asf?p=mina.git;a=shortlog;h=refs/heads/2.2.X

            johnnyv Jonathan Valliere added a comment - There hasn't been any activity on this issue for a while.  The changes have been pushed into https://gitbox.apache.org/repos/asf?p=mina.git;a=shortlog;h=refs/heads/2.2.X

            People

              johnnyv Jonathan Valliere
              maxime.leur Maxime Leur
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: