MINA
  1. MINA
  2. DIRMINA-653

IoSession.write not thread-safe? Loosing messages under heavy multi-threaded write on same session.

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: 2.0.0-M4
    • Fix Version/s: 2.0.0-M5
    • Component/s: Core, Filter
    • Labels:
      None
    • Environment:
      Windows Vista 64-bit

      Description

      I am writing a stress-test that tests multi-thread safetyness of our stateless encoder / decoder under heavy load and I am observing that messages are silently lost during session.write(Object), (the lost messages do not seem to reach the underlying socket buffer at all).

      I am using one encoder / decoder that is stateless. No executor filter, only the filter codec and a basic io handler.

      Synchronizing on the session.write makes the problem go away;

      synchronized (session)

      { future = session.write(message); }

      Do I really have to synchronize on the session to solve this issue?

        Activity

        Hide
        Paul Gregoire added a comment -

        Jens, you could also use a more 1.6+ solution by using a Semaphore with one permit and thread fairness enabled.
        private Semaphore lock = new Semaphore(1, true);

        try

        { lock.aquire(); future = session.write(message); }

        catch (Exception ex)

        { // log something }

        finally

        { lock.release(); }

        This would give the same effect as sync without any extra grief. Aquire also takes a timeout should that be needed.

        Show
        Paul Gregoire added a comment - Jens, you could also use a more 1.6+ solution by using a Semaphore with one permit and thread fairness enabled. private Semaphore lock = new Semaphore(1, true); try { lock.aquire(); future = session.write(message); } catch (Exception ex) { // log something } finally { lock.release(); } This would give the same effect as sync without any extra grief. Aquire also takes a timeout should that be needed.
        Hide
        Jens Reimann added a comment -

        Well in my opinion this issue is still open. I stumbled right over it and the fix only solves one part of the problem. I would guess, but this is only a guess, that this would also affect the SslFilter.

        After some debugging I found out that when multiple threads call "session.write" at the same time, the filter chain is processes unsynchronized by default. This will make several parallel calls to "doFilterWrite" and also to "deflate" in the "Zlib" helper class. While the stream is synchronized, the stream state is not synchronized with the underlying socket. The result of the "deflate" could contain a dictionary update if the zlib stream. If the following parallel call uses this new dictionary it needs to be written to the socket after the first call updated the dictionary.

        But this is not guaranteed! When the "deflate" method returns there is no lock on the filter chain anymore and the second packet may overtake the first packet (which holds the dictionary update). Hence the result on the other side will be a packet which is decoded with the wrong dictionary.

        It took my quite a while to figure that out since every test case I made was single threaded.

        So the assumption that IoSession is thread safe is simply wrong. I am not sure how to solve the issue here. I guess a simple mutex lock when writing to a session would do the trick in our case. In any way I would suggest to update the documentation to reflect that situation.

        Show
        Jens Reimann added a comment - Well in my opinion this issue is still open. I stumbled right over it and the fix only solves one part of the problem. I would guess, but this is only a guess, that this would also affect the SslFilter. After some debugging I found out that when multiple threads call "session.write" at the same time, the filter chain is processes unsynchronized by default. This will make several parallel calls to "doFilterWrite" and also to "deflate" in the "Zlib" helper class. While the stream is synchronized, the stream state is not synchronized with the underlying socket. The result of the "deflate" could contain a dictionary update if the zlib stream. If the following parallel call uses this new dictionary it needs to be written to the socket after the first call updated the dictionary. But this is not guaranteed! When the "deflate" method returns there is no lock on the filter chain anymore and the second packet may overtake the first packet (which holds the dictionary update). Hence the result on the other side will be a packet which is decoded with the wrong dictionary. It took my quite a while to figure that out since every test case I made was single threaded. So the assumption that IoSession is thread safe is simply wrong. I am not sure how to solve the issue here. I guess a simple mutex lock when writing to a session would do the trick in our case. In any way I would suggest to update the documentation to reflect that situation.
        Hide
        Christian Schwarz added a comment -

        Yes, the BufferDataException is thrown when we use the fixed code ! The fix works good for the race condition of the compressor, the following exception disappeared:
        :java.io.IOException: Unknown error. Error code : -3 and message : invalid stored block lengths

        Show
        Christian Schwarz added a comment - Yes, the BufferDataException is thrown when we use the fixed code ! The fix works good for the race condition of the compressor, the following exception disappeared: :java.io.IOException: Unknown error. Error code : -3 and message : invalid stored block lengths
        Hide
        Emmanuel Lecharny added a comment -

        but does it happens with the code I fixed ? (ie , afted I added the suggested synchronized section ) ?

        Show
        Emmanuel Lecharny added a comment - but does it happens with the code I fixed ? (ie , afted I added the suggested synchronized section ) ?
        Hide
        Christian Schwarz added a comment -

        It took the whole week to reproduce it, but finally we have once again! After some time we get an Exception from the SerialisationDecoder, it seem that at a specific point the SerialisationDecoder get malicious data of the CompressionDecoder.

        org.apache.mina.filter.codec.ProtocolDecoderException: org.apache.mina.core.buffer.BufferDataException: java.io.StreamCorruptedException: invalid type code: 02 (Hexdump: 00 00 02 C8 AC ED 00 05 73 72 01 00 2B 65 75 2E 67 65 6D 74 65 63 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 73 65 72 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 78 70 00 00 03 13 73 72 01 00 3C 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 63 6F 6D 6D 2E 64 61 74 61 6D 6F 64 65 6C 2E 44 61 74 61 4D 6F 64 65 6C 43 6F 6E 74 65 6E 74 73 78 70 75 72 01 00 2D 5B 4C 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 63 6F 6D 6D 6F 6E 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 49 53 74 61 74 65 3B 78 70 00 00 00 01 73 72 01 00 2E 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 63 6F 6D 6D 6F 6E 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 69 6D 70 6C 2E 53 74 61 74 65 78 72 01 00 34 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 63 6F 6D 6D 6F 6E 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 69 6D 70 6C 2E 42 61 73 65 45 6C 65 6D 65 6E 74 78 70 74 00 06 53 49 4C 45 4E 54 74 00 04 52 75 68 65 70 70 70 74 00 10 73 74 61 74 65 5F 6E 6F 72 6D 61 6C 2E 67 69 66 70 77 04 00 00 00 00 78 7E 72 01 00 3D 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 63 6F 6D 6D 2E 64 61 74 61 6D 6F 64 65 6C 2E 53 65 72 76 69 63 65 52 65 71 75 65 73 74 54 79 70 65 78 72 01 00 0E 6A 61 76 61 2E 6C 61 6E 67 2E 45 6E 75 6D 78 70 74 00 15 41 32 4D 5F 45 56 54 5F 53 54 41 54 45 5F 43 48 41 4E 47 45 44 73 72 01 00 37 6F 72 67 2E 74 69 73 65 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 6F 6E 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 69 6D 70 6C 2E 45 6C 65 6D 65 6E 74 49 64 65 6E 74 69 66 69 65 72 02 FB AC ED 00 05 73 72 01 00 2B 65 75 2E 67 6C 2E 4C 69 6E 6B 65 64 4C 69 73 74 78 70 77 04 00 00 00 04 74 00 04 72 6F 6F 74 74 00 02 23 31 74 00 0C 64 75 6D 6D 79 20 72 6F 6F 74 3A 31 74 00 01 31 78 74 00 40 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 63 6F 6D 6D 2E 6D 61 6E 61 67 65 72 2E 4D 61 6E 61 67 65 72 44 61 74 61 6D 6F 64 65 6C 53 65 72 76 69 63 65 7E 72 01 00 30 65 75 2E 67 65 6D 74 65 63 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 73 65 72 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 24 54 79 70 65 78 71 00 7E 00 0D 74 00 08 49 4E 46 4F 5F 4D 53 47)
        at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:251)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:429)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:796)
        at org.apache.mina.filter.compression.CompressionFilter.messageReceived(CompressionFilter.java:157)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:429)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:796)
        at org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:119)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:693)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:646)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:635)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$7(AbstractPollingIoProcessor.java:632)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:1079)
        at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
        Caused by: org.apache.mina.core.buffer.BufferDataException: java.io.StreamCorruptedException: invalid type code: 02
        at org.apache.mina.core.buffer.AbstractIoBuffer.getObject(AbstractIoBuffer.java:1984)
        at org.apache.mina.filter.codec.serialization.ObjectSerializationDecoder.doDecode(ObjectSerializationDecoder.java:92)
        at org.apache.mina.filter.codec.CumulativeProtocolDecoder.decode(CumulativeProtocolDecoder.java:178)
        at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:241)
        ... 19 more

        ________________________________________________________________________________________________________________

        org.apache.mina.filter.codec.ProtocolDecoderException: org.apache.mina.core.buffer.BufferDataException: dataLength: 1684108439 (Hexdump: 64 61 74 97 AC ED 00 05 73 72 01 00 2B 65 75 2E 67 65 6D 74 65 63 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 73 65 72 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 78 70 00 00 03 41 70 70 67 65 72 44 61 74 61 6D 6F 64 65 6C 53 65 72 76 69 63 65 7E 72 01 00 30 65 75 2E 67 65 6D 74 65 63 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 73 65 72 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 24 54 79 70 65 78 71 00 7E 0E 4B 45 45 50 5F 41 4C 49 56 4F 5F 4D 53 47)
        at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:251)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:429)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:796)
        at org.apache.mina.filter.compression.CompressionFilter.messageReceived(CompressionFilter.java:157)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:429)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:796)
        at org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:119)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:693)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:646)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:635)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$7(AbstractPollingIoProcessor.java:632)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:1079)
        at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:619)
        Caused by: org.apache.mina.core.buffer.BufferDataException: dataLength: 1684108439
        at org.apache.mina.core.buffer.AbstractIoBuffer.prefixedDataAvailable(AbstractIoBuffer.java:2058)
        at org.apache.mina.filter.codec.serialization.ObjectSerializationDecoder.doDecode(ObjectSerializationDecoder.java:88)
        at org.apache.mina.filter.codec.CumulativeProtocolDecoder.decode(CumulativeProtocolDecoder.java:178)
        at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:241)
        ... 19 more

        Show
        Christian Schwarz added a comment - It took the whole week to reproduce it, but finally we have once again! After some time we get an Exception from the SerialisationDecoder, it seem that at a specific point the SerialisationDecoder get malicious data of the CompressionDecoder. org.apache.mina.filter.codec.ProtocolDecoderException: org.apache.mina.core.buffer.BufferDataException: java.io.StreamCorruptedException: invalid type code: 02 (Hexdump: 00 00 02 C8 AC ED 00 05 73 72 01 00 2B 65 75 2E 67 65 6D 74 65 63 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 73 65 72 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 78 70 00 00 03 13 73 72 01 00 3C 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 63 6F 6D 6D 2E 64 61 74 61 6D 6F 64 65 6C 2E 44 61 74 61 4D 6F 64 65 6C 43 6F 6E 74 65 6E 74 73 78 70 75 72 01 00 2D 5B 4C 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 63 6F 6D 6D 6F 6E 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 49 53 74 61 74 65 3B 78 70 00 00 00 01 73 72 01 00 2E 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 63 6F 6D 6D 6F 6E 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 69 6D 70 6C 2E 53 74 61 74 65 78 72 01 00 34 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 63 6F 6D 6D 6F 6E 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 69 6D 70 6C 2E 42 61 73 65 45 6C 65 6D 65 6E 74 78 70 74 00 06 53 49 4C 45 4E 54 74 00 04 52 75 68 65 70 70 70 74 00 10 73 74 61 74 65 5F 6E 6F 72 6D 61 6C 2E 67 69 66 70 77 04 00 00 00 00 78 7E 72 01 00 3D 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 63 6F 6D 6D 2E 64 61 74 61 6D 6F 64 65 6C 2E 53 65 72 76 69 63 65 52 65 71 75 65 73 74 54 79 70 65 78 72 01 00 0E 6A 61 76 61 2E 6C 61 6E 67 2E 45 6E 75 6D 78 70 74 00 15 41 32 4D 5F 45 56 54 5F 53 54 41 54 45 5F 43 48 41 4E 47 45 44 73 72 01 00 37 6F 72 67 2E 74 69 73 65 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 6F 6E 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 69 6D 70 6C 2E 45 6C 65 6D 65 6E 74 49 64 65 6E 74 69 66 69 65 72 02 FB AC ED 00 05 73 72 01 00 2B 65 75 2E 67 6C 2E 4C 69 6E 6B 65 64 4C 69 73 74 78 70 77 04 00 00 00 04 74 00 04 72 6F 6F 74 74 00 02 23 31 74 00 0C 64 75 6D 6D 79 20 72 6F 6F 74 3A 31 74 00 01 31 78 74 00 40 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 63 6F 6D 6D 2E 6D 61 6E 61 67 65 72 2E 4D 61 6E 61 67 65 72 44 61 74 61 6D 6F 64 65 6C 53 65 72 76 69 63 65 7E 72 01 00 30 65 75 2E 67 65 6D 74 65 63 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 73 65 72 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 24 54 79 70 65 78 71 00 7E 00 0D 74 00 08 49 4E 46 4F 5F 4D 53 47) at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:251) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434) at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:429) at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:796) at org.apache.mina.filter.compression.CompressionFilter.messageReceived(CompressionFilter.java:157) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434) at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:429) at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:796) at org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:119) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434) at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426) at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:693) at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:646) at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:635) at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$7(AbstractPollingIoProcessor.java:632) at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:1079) at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) Caused by: org.apache.mina.core.buffer.BufferDataException: java.io.StreamCorruptedException: invalid type code: 02 at org.apache.mina.core.buffer.AbstractIoBuffer.getObject(AbstractIoBuffer.java:1984) at org.apache.mina.filter.codec.serialization.ObjectSerializationDecoder.doDecode(ObjectSerializationDecoder.java:92) at org.apache.mina.filter.codec.CumulativeProtocolDecoder.decode(CumulativeProtocolDecoder.java:178) at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:241) ... 19 more ________________________________________________________________________________________________________________ org.apache.mina.filter.codec.ProtocolDecoderException: org.apache.mina.core.buffer.BufferDataException: dataLength: 1684108439 (Hexdump: 64 61 74 97 AC ED 00 05 73 72 01 00 2B 65 75 2E 67 65 6D 74 65 63 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 73 65 72 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 78 70 00 00 03 41 70 70 67 65 72 44 61 74 61 6D 6F 64 65 6C 53 65 72 76 69 63 65 7E 72 01 00 30 65 75 2E 67 65 6D 74 65 63 2E 77 6F 74 61 6E 2E 63 6F 6D 6D 73 65 72 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 24 54 79 70 65 78 71 00 7E 0E 4B 45 45 50 5F 41 4C 49 56 4F 5F 4D 53 47) at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:251) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434) at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:429) at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:796) at org.apache.mina.filter.compression.CompressionFilter.messageReceived(CompressionFilter.java:157) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434) at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$5(DefaultIoFilterChain.java:429) at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:796) at org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:119) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434) at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426) at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:693) at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:646) at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:635) at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$7(AbstractPollingIoProcessor.java:632) at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:1079) at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) Caused by: org.apache.mina.core.buffer.BufferDataException: dataLength: 1684108439 at org.apache.mina.core.buffer.AbstractIoBuffer.prefixedDataAvailable(AbstractIoBuffer.java:2058) at org.apache.mina.filter.codec.serialization.ObjectSerializationDecoder.doDecode(ObjectSerializationDecoder.java:88) at org.apache.mina.filter.codec.CumulativeProtocolDecoder.decode(CumulativeProtocolDecoder.java:178) at org.apache.mina.filter.codec.ProtocolCodecFilter.messageReceived(ProtocolCodecFilter.java:241) ... 19 more
        Hide
        Emmanuel Lecharny added a comment -

        Hmmm.. The buffer does not contain compressed data.

        Anyway you can get a stack trace ?

        Show
        Emmanuel Lecharny added a comment - Hmmm.. The buffer does not contain compressed data. Anyway you can get a stack trace ?
        Hide
        Emmanuel Lecharny added a comment -

        Seems like the buffer is not cleaned up, and the decoder cannot kick a new decoding then.

        Clearly, the decoder 'thinks' that the four first bytes are the length (0x770400EB == 1996751083)

        Is this happening after some specific problem ?

        Show
        Emmanuel Lecharny added a comment - Seems like the buffer is not cleaned up, and the decoder cannot kick a new decoding then. Clearly, the decoder 'thinks' that the four first bytes are the length (0x770400EB == 1996751083) Is this happening after some specific problem ?
        Hide
        Christian Schwarz added a comment -

        @Emmanuel

        Sometimes i get the following Exception, which appears after every decode once it occures:

        org.apache.mina.filter.codec.ProtocolDecoderException: org.apache.mina.core.buffer.BufferDataException: dataLength: 1996751083 (Hexdump: 77 04 00 EB 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 78 70 00 00 00 02 73 72 01 00 3C 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 63 6F 6D 6D 2E 64 61 74 61 6D 6F 64 65 6C 2E 43 75 73 74 6F 6D 50 72 6F 00 19 73 71 00 7E 00 0D 00 00 00 00 77 04 00 00 00 0A 78 71 00 7E 26 68 73 71 00 7E 00 1C 70 70 73 71 00 7E 00 1E 73 71 00 7E 00 20 3F 40 00 00 00 00 00 0C 77 08 00 00 00 10 00 00 00 00 78 73 71 00 7E 00 22 73 71 00 7E 00 0D 00 00 00 00 77 04 00 00 00 0A 78 78 78 78 78 78 78 78 78 78 78 78 78 78 78 73 71 00 7E 00 12 77 0C 00 00 00 10 3F 40 00 00 00 00 00 06 71 00 7E 10 08 73 71 00 7E 00 14 74 00 0B 4D 41 4C 46 55 4E 43 54 49 4F 4E 74 00 08 53 74 C3 B6)

        my Fiterchain:
        fillterChain.addLast("compress", new CompressionFilter(COMPRESSION_MAX));
        filterChain.addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory(classLoader)); );
        filterChain.addLast("logger", new LoggingFilter());

        I hope it helps...
        Christian

        Show
        Christian Schwarz added a comment - @Emmanuel Sometimes i get the following Exception, which appears after every decode once it occures: org.apache.mina.filter.codec.ProtocolDecoderException: org.apache.mina.core.buffer.BufferDataException: dataLength: 1996751083 (Hexdump: 77 04 00 EB 76 69 63 65 2E 6D 65 73 73 61 67 65 2E 4D 65 73 73 61 67 65 78 70 00 00 00 02 73 72 01 00 3C 63 6F 6D 2E 69 78 65 6C 6C 65 6E 63 65 2E 66 6D 73 2E 64 61 74 61 6D 6F 64 65 6C 2E 63 6F 6D 6D 2E 64 61 74 61 6D 6F 64 65 6C 2E 43 75 73 74 6F 6D 50 72 6F 00 19 73 71 00 7E 00 0D 00 00 00 00 77 04 00 00 00 0A 78 71 00 7E 26 68 73 71 00 7E 00 1C 70 70 73 71 00 7E 00 1E 73 71 00 7E 00 20 3F 40 00 00 00 00 00 0C 77 08 00 00 00 10 00 00 00 00 78 73 71 00 7E 00 22 73 71 00 7E 00 0D 00 00 00 00 77 04 00 00 00 0A 78 78 78 78 78 78 78 78 78 78 78 78 78 78 78 73 71 00 7E 00 12 77 0C 00 00 00 10 3F 40 00 00 00 00 00 06 71 00 7E 10 08 73 71 00 7E 00 14 74 00 0B 4D 41 4C 46 55 4E 43 54 49 4F 4E 74 00 08 53 74 C3 B6) my Fiterchain: fillterChain.addLast("compress", new CompressionFilter(COMPRESSION_MAX)); filterChain.addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory(classLoader)); ); filterChain.addLast("logger", new LoggingFilter()); I hope it helps... Christian
        Hide
        Emmanuel Lecharny added a comment -

        I have added a synchronized section on ZLib.java, following Yannick suggestion. It's available in http://svn.apache.org/viewvc/mina/branches/2.0.3/

        here is the patch :
        http://svn.apache.org/viewvc?rev=1088959&view=rev

        Christian, can you tell me if it fixes your issue ?

        Show
        Emmanuel Lecharny added a comment - I have added a synchronized section on ZLib.java, following Yannick suggestion. It's available in http://svn.apache.org/viewvc/mina/branches/2.0.3/ here is the patch : http://svn.apache.org/viewvc?rev=1088959&view=rev Christian, can you tell me if it fixes your issue ?
        Hide
        Christian Schwarz added a comment -

        Hi Yannick,

        thanks for pointing out how to fix the CompressionFilter datarace !
        -> For the record : This Bug still exists in 2.0.2 !

        Thanks Chris

        Show
        Christian Schwarz added a comment - Hi Yannick, thanks for pointing out how to fix the CompressionFilter datarace ! -> For the record : This Bug still exists in 2.0.2 ! Thanks Chris
        Hide
        Yannick Lecaillez added a comment -

        Hi,

        I got the same problem using CompressionFilter.

        The problem come from CompressionFilter which is not thread safe. MINA documentation
        sates clearly that "IoSession is thread-safe. But please note that performing more than one write(Object) calls at the same time will cause the IoFilter.filterWrite(IoFilter.NextFilter,IoSession,WriteRequest) to be executed simultaneously, and therefore you have to make sure the IoFilter implementations you're using are thread-safe, too. "

        The problem here is that zStream in CompressionFilter is not synchronized an so, not thread-safe.
        We fixed the problem simply by surrounding zStream access with a "synchronized" in CompressionFilter.

        Hope that helps,
        Yannick.

        Show
        Yannick Lecaillez added a comment - Hi, I got the same problem using CompressionFilter. The problem come from CompressionFilter which is not thread safe. MINA documentation sates clearly that "IoSession is thread-safe. But please note that performing more than one write(Object) calls at the same time will cause the IoFilter.filterWrite(IoFilter.NextFilter,IoSession,WriteRequest) to be executed simultaneously, and therefore you have to make sure the IoFilter implementations you're using are thread-safe, too. " The problem here is that zStream in CompressionFilter is not synchronized an so, not thread-safe. We fixed the problem simply by surrounding zStream access with a "synchronized" in CompressionFilter. Hope that helps, Yannick.
        Hide
        Florian Weigand added a comment -

        I am still able to reproduce this problem in RC1 everytime (sooner or later) in my application. I am using mina-core-2.0.0-RC1, mina-filter-compression-2.0.0-RC1 and jzlib-1.0.7.

        I have added a CompressionFilter and after that my ProtocolCodecFilter(new MyCodecFactory()) to the Chain on both my client and server application.

        I am calling IoSession.write() from at least 3 different threads concurrently in the client. One of the threads is named "pool-4-thread-1". the others are custom threads.

        after a while the client throws some exception:

        java.lang.ArrayIndexOutOfBoundsException: -1
        at com.jcraft.jzlib.Deflate._tr_tally(Deflate.java:640)
        at com.jcraft.jzlib.Deflate.deflate_slow(Deflate.java:1172)
        at com.jcraft.jzlib.Deflate.deflate(Deflate.java:1567)
        at com.jcraft.jzlib.ZStream.deflate(ZStream.java:133)
        at org.apache.mina.filter.compression.Zlib.deflate(Zlib.java:192)
        at org.apache.mina.filter.compression.CompressionFilter.doFilterWrite(CompressionFilter.java:187)

        or this one:

        java.io.IOException: Compression failed with return value : -5
        at org.apache.mina.filter.compression.Zlib.deflate(Zlib.java:196)

        this happens on occasion, but even if i dont see this or some similar exception on the client, i always see this exception on the server:

        java.io.IOException: Unknown error. Error code : -3 and message : invalid stored block lengths
        at org.apache.mina.filter.compression.Zlib.inflate(Zlib.java:153)
        at org.apache.mina.filter.compression.CompressionFilter.messageReceived(CompressionFilter.java:156)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:46)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:793)
        at org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:119)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434)
        at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:638)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:598)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:587)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$400(AbstractPollingIoProcessor.java:61)
        at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:969)
        at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
        at java.lang.Thread.run(Thread.java:595)

        i can delay or avoid the exception by using the debugger and break to only send one message at a time.

        synchronizing the IoSession#write() on the client makes the problem disappear and everything works as expected, but at the cost of heavy performance loss.

        looks to me like a single internal buffer to hold partial data concurrently overridden by different threads, which corrupts the buffer and hence the server fails to decompress the data.

        i am using Java 1.5.0_16, running a localhost connection in two different JVMs.

        please look into this again.

        Show
        Florian Weigand added a comment - I am still able to reproduce this problem in RC1 everytime (sooner or later) in my application. I am using mina-core-2.0.0-RC1, mina-filter-compression-2.0.0-RC1 and jzlib-1.0.7. I have added a CompressionFilter and after that my ProtocolCodecFilter(new MyCodecFactory()) to the Chain on both my client and server application. I am calling IoSession.write() from at least 3 different threads concurrently in the client. One of the threads is named "pool-4-thread-1". the others are custom threads. after a while the client throws some exception: java.lang.ArrayIndexOutOfBoundsException: -1 at com.jcraft.jzlib.Deflate._tr_tally(Deflate.java:640) at com.jcraft.jzlib.Deflate.deflate_slow(Deflate.java:1172) at com.jcraft.jzlib.Deflate.deflate(Deflate.java:1567) at com.jcraft.jzlib.ZStream.deflate(ZStream.java:133) at org.apache.mina.filter.compression.Zlib.deflate(Zlib.java:192) at org.apache.mina.filter.compression.CompressionFilter.doFilterWrite(CompressionFilter.java:187) or this one: java.io.IOException: Compression failed with return value : -5 at org.apache.mina.filter.compression.Zlib.deflate(Zlib.java:196) this happens on occasion, but even if i dont see this or some similar exception on the client, i always see this exception on the server: java.io.IOException: Unknown error. Error code : -3 and message : invalid stored block lengths at org.apache.mina.filter.compression.Zlib.inflate(Zlib.java:153) at org.apache.mina.filter.compression.CompressionFilter.messageReceived(CompressionFilter.java:156) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434) at org.apache.mina.core.filterchain.DefaultIoFilterChain.access$1200(DefaultIoFilterChain.java:46) at org.apache.mina.core.filterchain.DefaultIoFilterChain$EntryImpl$1.messageReceived(DefaultIoFilterChain.java:793) at org.apache.mina.core.filterchain.IoFilterAdapter.messageReceived(IoFilterAdapter.java:119) at org.apache.mina.core.filterchain.DefaultIoFilterChain.callNextMessageReceived(DefaultIoFilterChain.java:434) at org.apache.mina.core.filterchain.DefaultIoFilterChain.fireMessageReceived(DefaultIoFilterChain.java:426) at org.apache.mina.core.polling.AbstractPollingIoProcessor.read(AbstractPollingIoProcessor.java:638) at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:598) at org.apache.mina.core.polling.AbstractPollingIoProcessor.process(AbstractPollingIoProcessor.java:587) at org.apache.mina.core.polling.AbstractPollingIoProcessor.access$400(AbstractPollingIoProcessor.java:61) at org.apache.mina.core.polling.AbstractPollingIoProcessor$Processor.run(AbstractPollingIoProcessor.java:969) at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:64) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675) at java.lang.Thread.run(Thread.java:595) i can delay or avoid the exception by using the debugger and break to only send one message at a time. synchronizing the IoSession#write() on the client makes the problem disappear and everything works as expected, but at the cost of heavy performance loss. looks to me like a single internal buffer to hold partial data concurrently overridden by different threads, which corrupts the buffer and hence the server fails to decompress the data. i am using Java 1.5.0_16, running a localhost connection in two different JVMs. please look into this again.
        Hide
        Mauritz Lovgren added a comment -

        I agree, DIRMINA-675 seems very much like my issue, except that I experienced that messages were lost, not duplicated... but I concluded that the source was exactly the same spot in code that you have just fixed, so we keep this closed for now. I reopen the moment I find it to be a false true .

        Show
        Mauritz Lovgren added a comment - I agree, DIRMINA-675 seems very much like my issue, except that I experienced that messages were lost, not duplicated... but I concluded that the source was exactly the same spot in code that you have just fixed, so we keep this closed for now. I reopen the moment I find it to be a false true .
        Hide
        Emmanuel Lecharny added a comment -

        Ok, I can't resist

        Feel free tro reopen the issue if needed !

        Show
        Emmanuel Lecharny added a comment - Ok, I can't resist Feel free tro reopen the issue if needed !
        Hide
        Emmanuel Lecharny added a comment -

        I think that the fix for DIRMINA-675 should have corrected this issue. Can someone confirm ?

        Show
        Emmanuel Lecharny added a comment - I think that the fix for DIRMINA-675 should have corrected this issue. Can someone confirm ?
        Hide
        Emmanuel Lecharny added a comment -

        Most likely. This is what I was thinking about yesterday : the fix done in the encoder could have also fixed some of the pending issues, but I didn't had time to check the JIRAs.

        Mauritz, I don't know if you could test it against the latest trunk ?

        Thanks !

        Show
        Emmanuel Lecharny added a comment - Most likely. This is what I was thinking about yesterday : the fix done in the encoder could have also fixed some of the pending issues, but I didn't had time to check the JIRAs. Mauritz, I don't know if you could test it against the latest trunk ? Thanks !
        Hide
        Steve Ulrich added a comment -

        This could be related to DIRMINA-675. Does it appear with the latest trunk?

        Show
        Steve Ulrich added a comment - This could be related to DIRMINA-675 . Does it appear with the latest trunk?
        Hide
        Mauritz Lovgren added a comment -

        As a consequence of this I have been forced to perform encoding of large messages (in parallell) before writing them to the IoSession (I use compression on ObjectOutputStream'ed objects that, should it be performed inside a synchronized (session) would block the session for writing from other threads the entire encoding time (which could be a couple of hundred millis on larger messages!)

        In other words, I send pre-prepared ByteBuffers from pre-encoded messages to session.write instead of letting the Encoder do the encoding during the session.write.

        Sad, but true

        Show
        Mauritz Lovgren added a comment - As a consequence of this I have been forced to perform encoding of large messages (in parallell) before writing them to the IoSession (I use compression on ObjectOutputStream'ed objects that, should it be performed inside a synchronized (session) would block the session for writing from other threads the entire encoding time (which could be a couple of hundred millis on larger messages!) In other words, I send pre-prepared ByteBuffers from pre-encoded messages to session.write instead of letting the Encoder do the encoding during the session.write. Sad, but true
        Hide
        Emmanuel Lecharny added a comment -

        Definitively has to be fixed or dismissed for 2.0-RC1

        Show
        Emmanuel Lecharny added a comment - Definitively has to be fixed or dismissed for 2.0-RC1
        Hide
        Mauritz Lovgren added a comment -

        No, no executor filter, only a protocol codec filter and an IoHandler.

        As far as I can see, one explanation can be that the session.write has no protection for simultaneous update of the ProtocolEncoderOutputImpl (ENCODER_OUT) property. If multiple threads call write simultaneously, the property (holding the currently message to write) will suffer a race on update which might prevent messages from being written to the write queue.
        I might be wrong here, but I find no other simple explanation.

        Another curiosity is that should the message fail to be added to the write queue, no error is generated (the offer() method to the CyclicQueue could return false, which is just ignored).

        Show
        Mauritz Lovgren added a comment - No, no executor filter, only a protocol codec filter and an IoHandler. As far as I can see, one explanation can be that the session.write has no protection for simultaneous update of the ProtocolEncoderOutputImpl (ENCODER_OUT) property. If multiple threads call write simultaneously, the property (holding the currently message to write) will suffer a race on update which might prevent messages from being written to the write queue. I might be wrong here, but I find no other simple explanation. Another curiosity is that should the message fail to be added to the write queue, no error is generated (the offer() method to the CyclicQueue could return false, which is just ignored).
        Hide
        Emmanuel Lecharny added a comment -

        Are you using an executorFilter somewhere in the chain ?

        Show
        Emmanuel Lecharny added a comment - Are you using an executorFilter somewhere in the chain ?
        Hide
        Emmanuel Lecharny added a comment -

        Are you using an executorFilter somwhere in the chain ?

        Show
        Emmanuel Lecharny added a comment - Are you using an executorFilter somwhere in the chain ?

          People

          • Assignee:
            Unassigned
            Reporter:
            Mauritz Lovgren
          • Votes:
            1 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development