MINA
  1. MINA
  2. DIRMINA-845

ProtocolEncoderOutputImpl isn't thread-safe

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 2.0.4
    • Fix Version/s: None
    • Component/s: Filter
    • Labels:
      None

      Description

      ProtocolEncoderOutputImpl uses ConcurrentLinkedQueue and at first look it seems to be thread-safe. But really concurrent execution of flush method isn't thread-safe (and write-mergeAll also).

      E.g. in RTMP several channels multiplexed in single connection. According protocol specification it's possible to write to different channels concurrently. But it doesn't work with MINA.
      I've synchronized channel writing, but it doesn't prevent concurrent run of flushing (in 2.0.4 it's done directly in ProtocolCodecFilter.filterWrite, but ProtocolEncoderOutputImpl.flush has the same problem).

      Here the fragment of flushing code:

      while (!bufferQueue.isEmpty()) {
      Object encodedMessage = bufferQueue.poll();

      if (encodedMessage == null)

      { break; }

      // Flush only when the buffer has remaining.
      if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining())

      { SocketAddress destination = writeRequest.getDestination(); WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); nextFilter.filterWrite(session, encodedWriteRequest); }

      }

      Suppose original packets sequence is A, B, ...
      Concurrent run of flushing may proceed as following:

      thread-1: Object encodedMessage = bufferQueue.poll(); // gets A packet
      thread-2: Object encodedMessage = bufferQueue.poll(); // gets B packet
      ...
      thread-2: nextFilter.filterWrite(...); // writes B packet
      thread-1: nextFilter.filterWrite(...); // writes A packet

      so, resulting sequence will B, A

      It's quite confusing result especially when documentation doesn't contain any explanation about such behavior.

        Activity

        Hide
        Luigi Bitonti added a comment -

        Hi Ilya,

        Interesting report. would you be able to share some details of your solution?

        Thanks,
        Luigi

        Show
        Luigi Bitonti added a comment - Hi Ilya, Interesting report. would you be able to share some details of your solution? Thanks, Luigi
        Hide
        Ilya Ivanov added a comment -

        Hi, Dominic!

        I’ve read your post about deadlock issue. Really, we didn’t face the deadlock problem because we use asynchronous message dispatching.
        I tried to eliminate locks in encode/decode by another reason but looks like my solution also might solve your problem too.

        I sent your e-mail with some details.

        Show
        Ilya Ivanov added a comment - Hi, Dominic! I’ve read your post about deadlock issue. Really, we didn’t face the deadlock problem because we use asynchronous message dispatching. I tried to eliminate locks in encode/decode by another reason but looks like my solution also might solve your problem too. I sent your e-mail with some details.
        Hide
        Dominic Williams added a comment -

        Ilya perhaps we can coordinate to create a solution for Red5.

        Take a look at related issue regarding how serialization here can lead to deadlock http://code.google.com/p/red5/issues/detail?id=164

        Show
        Dominic Williams added a comment - Ilya perhaps we can coordinate to create a solution for Red5. Take a look at related issue regarding how serialization here can lead to deadlock http://code.google.com/p/red5/issues/detail?id=164
        Hide
        Ilya Ivanov added a comment -

        We are using red5 r4047 with MINA 2.0.0 RC1 and I couldn't just set "encoderOut" property because explicit type cast to ProtocolEncoderOutputImpl present in ProtocolCodecFilter.filterWrite

        In MINA 2.0.4 situation is a bit better - type cast to AbstractProtocolEncoderOutput. Of course, I can extend AbstractProtocolEncoderOutput for my implementation but anyway it looks like workaround not a regular way.

        I suppose ProtocolCodecFilter.filterWrite should look like this

        // ...
        try {
        // Now we can try to encode the response
        encoder.encode(session, message, encoderOut);

        // Send it directly
        if (encoderOut instanceof Flushable)

        { ((Flushable)encoderOut).flush(); }

        // Call the next filter
        nextFilter.filterWrite(session, new MessageWriteRequest(
        writeRequest));
        } catch (Throwable t)

        { // ... }

        and ProtocolCodecFilter.getEncoderOut

        protected ProtocolEncoderOutput getEncoderOut(IoSession session,
        NextFilter nextFilter, WriteRequest writeRequest) {
        // default implementation which might be overridden in derived classes
        ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);

        if (out == null)

        { // Create a new instance, and stores it into the session out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); session.setAttribute(ENCODER_OUT, out); }

        return out;
        }

        And several words about ConcurrentLinkedQueue in ProtocolCodecFilter again.

        Emmanuel, early you wrote "We use a CLQ because you may have many messages written for i-one single session by many threads, if you add an executor in the chain."

        I looked to Executors too. In case of OrderedThreadPoolExecutor all will work fine because it maintain events order per session, but if we insert UnorderedThreadPoolExecutor before ProtocolCodecFilter that might get us in trouble as described in this issue. Of course, you may say "if you set unordered executor then events order doesn't matter for you and it is ok that flushing distort order too". But there is interesting aspect here. Messages which are processed by executor (before-encode) and messages are written to nextFilter via ProtocolEncoderOutput (after-encode) might have different meaning. Back to my example, before-encode message is a video frame and order for entire frames doesn't matter, after-encode message is a video frame chunks and order of chunks do matter!

        So, I just want to say, when I write to ProtocolEncoderOutput I expect that next filter receives messages in the same order as they were written (may be mixed with other messages, but relative order should be preserved) but flush may distort this relative order!

        I understand this example may look quite artificial and I'm not sure it should be fixed in default implementation but such behavior must be reflected in doc and must be a regular way to provide custom implementation of ProtocolEncoderOutput for such cases.

        Show
        Ilya Ivanov added a comment - We are using red5 r4047 with MINA 2.0.0 RC1 and I couldn't just set "encoderOut" property because explicit type cast to ProtocolEncoderOutputImpl present in ProtocolCodecFilter.filterWrite In MINA 2.0.4 situation is a bit better - type cast to AbstractProtocolEncoderOutput. Of course, I can extend AbstractProtocolEncoderOutput for my implementation but anyway it looks like workaround not a regular way. I suppose ProtocolCodecFilter.filterWrite should look like this // ... try { // Now we can try to encode the response encoder.encode(session, message, encoderOut); // Send it directly if (encoderOut instanceof Flushable) { ((Flushable)encoderOut).flush(); } // Call the next filter nextFilter.filterWrite(session, new MessageWriteRequest( writeRequest)); } catch (Throwable t) { // ... } and ProtocolCodecFilter.getEncoderOut protected ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { // default implementation which might be overridden in derived classes ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT); if (out == null) { // Create a new instance, and stores it into the session out = new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); session.setAttribute(ENCODER_OUT, out); } return out; } And several words about ConcurrentLinkedQueue in ProtocolCodecFilter again. Emmanuel, early you wrote "We use a CLQ because you may have many messages written for i-one single session by many threads, if you add an executor in the chain." I looked to Executors too. In case of OrderedThreadPoolExecutor all will work fine because it maintain events order per session, but if we insert UnorderedThreadPoolExecutor before ProtocolCodecFilter that might get us in trouble as described in this issue. Of course, you may say "if you set unordered executor then events order doesn't matter for you and it is ok that flushing distort order too". But there is interesting aspect here. Messages which are processed by executor (before-encode) and messages are written to nextFilter via ProtocolEncoderOutput (after-encode) might have different meaning. Back to my example, before-encode message is a video frame and order for entire frames doesn't matter, after-encode message is a video frame chunks and order of chunks do matter! So, I just want to say, when I write to ProtocolEncoderOutput I expect that next filter receives messages in the same order as they were written (may be mixed with other messages, but relative order should be preserved) but flush may distort this relative order! I understand this example may look quite artificial and I'm not sure it should be fixed in default implementation but such behavior must be reflected in doc and must be a regular way to provide custom implementation of ProtocolEncoderOutput for such cases.
        Hide
        Paul Gregoire added a comment -

        Ilya, I hope to see a patch for Red5 when you get this sorted out

        Show
        Paul Gregoire added a comment - Ilya, I hope to see a patch for Red5 when you get this sorted out
        Hide
        Emmanuel Lecharny added a comment -

        Looking at the current coe, there is an (undocumented) way to switch the ProtocolDecoderOutput class : you can set the "encoderOut" session property associated with your implementation of the ProtocolEncoderOutput interface. This has to be done when the SessionCreated event is processed in your handler.

        It should work...

        Show
        Emmanuel Lecharny added a comment - Looking at the current coe, there is an (undocumented) way to switch the ProtocolDecoderOutput class : you can set the "encoderOut" session property associated with your implementation of the ProtocolEncoderOutput interface. This has to be done when the SessionCreated event is processed in your handler. It should work...
        Hide
        Ilya Ivanov added a comment -

        I'm pleasured we can understand each other at last!
        Sure, I try to test when ready.
        Thank you!

        Show
        Ilya Ivanov added a comment - I'm pleasured we can understand each other at last! Sure, I try to test when ready. Thank you!
        Hide
        Emmanuel Lecharny added a comment -

        Ilya, this is a pretty damn good suggestion.

        Being able to override the queue instead of sub-classing the whole filter would have saved you a lot of time.

        We can work out such a modification in the MINA code base, making it easier to deal with thread-safeness. I'll try to patch the code, if you can test it (it will be in a branch) that would be great !

        Show
        Emmanuel Lecharny added a comment - Ilya, this is a pretty damn good suggestion. Being able to override the queue instead of sub-classing the whole filter would have saved you a lot of time. We can work out such a modification in the MINA code base, making it easier to deal with thread-safeness. I'll try to patch the code, if you can test it (it will be in a branch) that would be great !
        Hide
        Emmanuel Lecharny added a comment -

        this problem should be mitigated by modifying the codec filter, and making the queue thread safe.

        The best solution I foresee would be to copy the current code, add a mutex around the queue to protect it, and use this filter in your chain.

        I won't be a MINA filter though, unless we add it as a ThreadSafeProtocolCodecFilter.

        Currently, I see no other solution...

        Show
        Emmanuel Lecharny added a comment - this problem should be mitigated by modifying the codec filter, and making the queue thread safe. The best solution I foresee would be to copy the current code, add a mutex around the queue to protect it, and use this filter in your chain. I won't be a MINA filter though, unless we add it as a ThreadSafeProtocolCodecFilter. Currently, I see no other solution...
        Hide
        Ilya Ivanov added a comment -

        Actually, I already solved this problem in my particular case by sub-classing ProtocolCodecFilter and providing custom implementation of ProtocolEncoderOutput.

        And I was disappointed, there is no easy way to provide custom ProtocolEncoderOutput impl. I was forced to override filterWrite, and messageSent and re-implement some nested classes, but really I just need that getEncoderOut would be protected and work as ProtocolEncoderOutput factory.

        Show
        Ilya Ivanov added a comment - Actually, I already solved this problem in my particular case by sub-classing ProtocolCodecFilter and providing custom implementation of ProtocolEncoderOutput. And I was disappointed, there is no easy way to provide custom ProtocolEncoderOutput impl. I was forced to override filterWrite, and messageSent and re-implement some nested classes, but really I just need that getEncoderOut would be protected and work as ProtocolEncoderOutput factory.
        Hide
        Ilya Ivanov added a comment -

        "The only reason you might send those message in the wrong order is that they are read from the queue by two different threads"

        Exactly this case! flush itself read sequentially but two concurrent flush(es) are run at the moment because two threads writing to session 3.

        Show
        Ilya Ivanov added a comment - "The only reason you might send those message in the wrong order is that they are read from the queue by two different threads" Exactly this case! flush itself read sequentially but two concurrent flush(es) are run at the moment because two threads writing to session 3.
        Hide
        Emmanuel Lecharny added a comment -

        I forgot to mention that, yes, the Javadoco sucks :/

        Also note I'm not arguing with you about the issue, I really try to understand your use case, to see if MINA should be modified in a way or another, or if we just need to document better the code.

        Show
        Emmanuel Lecharny added a comment - I forgot to mention that, yes, the Javadoco sucks :/ Also note I'm not arguing with you about the issue, I really try to understand your use case, to see if MINA should be modified in a way or another, or if we just need to document better the code.
        Hide
        Emmanuel Lecharny added a comment -

        Still puzzled...

        If the chunks are written in the CLQ in the right order, there is no reason for those chunks to be send in another order, as the queue is read sequentially in the flush() method.

        The only reason you might send those message in the wrong order is that they are read from the queue by two different threads, which should not be the case...

        Am I missing something ?

        Show
        Emmanuel Lecharny added a comment - Still puzzled... If the chunks are written in the CLQ in the right order, there is no reason for those chunks to be send in another order, as the queue is read sequentially in the flush() method. The only reason you might send those message in the wrong order is that they are read from the queue by two different threads, which should not be the case... Am I missing something ?
        Hide
        Ilya Ivanov added a comment - - edited

        I understand, there are reasons to leave class partially thread-safe, but I think this should be reflected in java doc at least.

        "What you are saying is that the frames coming from a session1 in the order (A,B) are pushed in this order to session 3, but are sometime transmitted in the reverse order (B,A), right?"

        Not exactly but closer to true. A and B aren't two frames they are chunks of one frame. Chunk equals to message in MINA terms.
        One thread splits frame onto chunks and writes in to output (chunks order is correct here). Another concurrent thread does the same with another frame but writes to the same output. All correct until both thread run flush on the same output.

        public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws ProtocolCodecException {
        // ...
        final IoBuffer buf = encoder.encode(state, message);
        if (buf != null) {
        Chunker chunker = new Chunker(buf, state.getWriteChunkSize(), 2048);
        while (chunker.hasNext())

        { out.write(chunker.next()); }

        }
        // ...
        out.flush(); // <- problem HERE
        }

        Here 'session' is session 3 and encode runs concurrently in session 1's thread and session 2's thread.

        Show
        Ilya Ivanov added a comment - - edited I understand, there are reasons to leave class partially thread-safe, but I think this should be reflected in java doc at least. "What you are saying is that the frames coming from a session1 in the order (A,B) are pushed in this order to session 3, but are sometime transmitted in the reverse order (B,A), right?" Not exactly but closer to true. A and B aren't two frames they are chunks of one frame. Chunk equals to message in MINA terms. One thread splits frame onto chunks and writes in to output (chunks order is correct here). Another concurrent thread does the same with another frame but writes to the same output. All correct until both thread run flush on the same output. public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws ProtocolCodecException { // ... final IoBuffer buf = encoder.encode(state, message); if (buf != null) { Chunker chunker = new Chunker(buf, state.getWriteChunkSize(), 2048); while (chunker.hasNext()) { out.write(chunker.next()); } } // ... out.flush(); // <- problem HERE } Here 'session' is session 3 and encode runs concurrently in session 1's thread and session 2's thread.
        Hide
        Emmanuel Lecharny added a comment -

        About the frame order :

        • I think I misunderstood the issue. What you are saying is that the frames coming from a session1 in the order (A,B) are pushed in this order to session 3, but are sometime transmitted in the reverse order (B,A), right ?

        How do you write the frames to the session 3 ?

        Show
        Emmanuel Lecharny added a comment - About the frame order : I think I misunderstood the issue. What you are saying is that the frames coming from a session1 in the order (A,B) are pushed in this order to session 3, but are sometime transmitted in the reverse order (B,A), right ? How do you write the frames to the session 3 ?
        Hide
        Emmanuel Lecharny added a comment -

        We use a CLQ because you may have many messages written for i-one single session by many threads, if you add an executor in the chain. This queue is a guaranty that we don't lose messages.

        Making this class thread safe would create a bottleneck which will be killing any heavily loaded server, when most of the time you don't have an executor. It's enough to make sure that each message has to be processed by the same session all the time. Or you can also use the OrderedThreadPoolExecutor which guarantees that message are processed in the right order when you add an executor.

        Show
        Emmanuel Lecharny added a comment - We use a CLQ because you may have many messages written for i-one single session by many threads, if you add an executor in the chain. This queue is a guaranty that we don't lose messages. Making this class thread safe would create a bottleneck which will be killing any heavily loaded server, when most of the time you don't have an executor. It's enough to make sure that each message has to be processed by the same session all the time. Or you can also use the OrderedThreadPoolExecutor which guarantees that message are processed in the right order when you add an executor.
        Hide
        Ilya Ivanov added a comment - - edited

        "In your use case, you are assuming that frames coming from session 1 and from session 2 are in sync"

        No, frames come async. Order of frames itself doesn't matter but order of chunks of one frame do matter.
        And I've ordered chunks when writing to ProtocolEncoderOutput, in bufferQueue chunks placed in right order but when flushing order is distorted.

        Show
        Ilya Ivanov added a comment - - edited "In your use case, you are assuming that frames coming from session 1 and from session 2 are in sync" No, frames come async. Order of frames itself doesn't matter but order of chunks of one frame do matter. And I've ordered chunks when writing to ProtocolEncoderOutput, in bufferQueue chunks placed in right order but when flushing order is distorted.
        Hide
        Ilya Ivanov added a comment -

        In general, it isn't so important what exactly causes concurrent run of flush method, I just don't understand why ConcurrentLinkedQueue is used in ProtocolEncoderOutputImpl if it isn't made this class thread-safe?!

        One more example, concurrent run of write and mergeAll may lead to BufferOverflowException.

        Show
        Ilya Ivanov added a comment - In general, it isn't so important what exactly causes concurrent run of flush method, I just don't understand why ConcurrentLinkedQueue is used in ProtocolEncoderOutputImpl if it isn't made this class thread-safe?! One more example, concurrent run of write and mergeAll may lead to BufferOverflowException.
        Hide
        Emmanuel Lecharny added a comment -

        In your use case, you are assuming that frames coming from session 1 and from session 2 are in sync (ie the order they arrive must be respected).

        This is not a MINA issue, it's up to your handler to deal with such a scenario. You have to order the frames before writing them to session 3. Ie, you should wait for frame 2A when receiving frame 2B before you can send 2B.

        Show
        Emmanuel Lecharny added a comment - In your use case, you are assuming that frames coming from session 1 and from session 2 are in sync (ie the order they arrive must be respected). This is not a MINA issue, it's up to your handler to deal with such a scenario. You have to order the frames before writing them to session 3. Ie, you should wait for frame 2A when receiving frame 2B before you can send 2B.
        Hide
        Ilya Ivanov added a comment - - edited

        Let suppose we have 3 session (3 rtmp connections) 1, 2 and 3. Session 1 receives video frame and re-sends it to session 3, session 2 receives another video frame and re-send it to session 3 too. All sessions have different io-processor threads.

        Note video frame represented as sequence of several messages in MINA terms (chunks). Frame is sent further when it received completely. When sending it is splitted on chunks again.

        So, two incoming frames processed by different threads will concurrently pass through session 3 filter chain chunk by chunk. Exactly chunking leads to issue above. Flushing may break chunks order.

        Let frame F1 chunked as F1A, F1B and F2 as F2A, F2B. They might be written to ProtocolEncoderOutput in such order F1A, F2A, F2B, F1B for example. It's ok that chunks of different frames are mixed, but important that B chunk follows after A for each frame. When flush runs concurrently the above sequence might be written to next filter as F1A, F2B, F2A, F1B - wrong: F2B precedes F2A!

        I'm not sure this is right using of MINA or not but such implementation I saw in red5...

        http://code.google.com/p/red5/source/browse/java/server/trunk/src/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java lines 50 and 70

        I tried to eliminate entire connection lock and replace it on per-channel lock (RTMP itself allows this). Also I add chunking which didn't present in original code.

        Show
        Ilya Ivanov added a comment - - edited Let suppose we have 3 session (3 rtmp connections) 1, 2 and 3. Session 1 receives video frame and re-sends it to session 3, session 2 receives another video frame and re-send it to session 3 too. All sessions have different io-processor threads. Note video frame represented as sequence of several messages in MINA terms (chunks). Frame is sent further when it received completely. When sending it is splitted on chunks again. So, two incoming frames processed by different threads will concurrently pass through session 3 filter chain chunk by chunk. Exactly chunking leads to issue above. Flushing may break chunks order. Let frame F1 chunked as F1A, F1B and F2 as F2A, F2B. They might be written to ProtocolEncoderOutput in such order F1A, F2A, F2B, F1B for example. It's ok that chunks of different frames are mixed, but important that B chunk follows after A for each frame. When flush runs concurrently the above sequence might be written to next filter as F1A, F2B, F2A, F1B - wrong: F2B precedes F2A! I'm not sure this is right using of MINA or not but such implementation I saw in red5... http://code.google.com/p/red5/source/browse/java/server/trunk/src/org/red5/server/net/rtmp/codec/RTMPMinaProtocolEncoder.java lines 50 and 70 I tried to eliminate entire connection lock and replace it on per-channel lock (RTMP itself allows this). Also I add chunking which didn't present in original code.
        Hide
        Emmanuel Lecharny added a comment -

        Ok,

        when you send a message, you do it using the thread that was used to process the incoming request. This thread has been selected when the session n which this message has arrived was activated.

        If you haven't changed anything (like, adding an executor in the chain), then an incoming message for a specific session will always use the same thread, so there is no reason a message B can be written before message A, as the thread isn't available before message A is injected into the queue.

        That's the theory, and trust me, it works. Now, if I don't have the code you are playing with, I won't be able to explain why you see some concurrent issues.

        Show
        Emmanuel Lecharny added a comment - Ok, when you send a message, you do it using the thread that was used to process the incoming request. This thread has been selected when the session n which this message has arrived was activated. If you haven't changed anything (like, adding an executor in the chain), then an incoming message for a specific session will always use the same thread, so there is no reason a message B can be written before message A, as the thread isn't available before message A is injected into the queue. That's the theory, and trust me, it works. Now, if I don't have the code you are playing with, I won't be able to explain why you see some concurrent issues.
        Hide
        Ilya Ivanov added a comment - - edited

        Actually, it isn't my chain. I've found this problem when trying to optimize red5 media-server.
        AFAIU, in red5 "serial" chain is used, i.e. all filters are called in writer thread (there is no other executors).

        Show
        Ilya Ivanov added a comment - - edited Actually, it isn't my chain. I've found this problem when trying to optimize red5 media-server. AFAIU, in red5 "serial" chain is used, i.e. all filters are called in writer thread (there is no other executors).
        Hide
        Emmanuel Lecharny added a comment -

        Are you using an executor in your chain ?

        Show
        Emmanuel Lecharny added a comment - Are you using an executor in your chain ?

          People

          • Assignee:
            Unassigned
            Reporter:
            Ilya Ivanov
          • Votes:
            2 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:

              Development