Kafka
  1. Kafka
  2. KAFKA-2308

New producer + Snappy face un-compression errors after broker restart

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.9.0.0, 0.8.2.2
    • Component/s: None
    • Labels:
      None

      Description

      Looks like the new producer, when used with Snappy, following a broker restart is sending messages the brokers can't decompress. This issue was discussed at few mailing lists thread, but I don't think we ever resolved it.

      I can reproduce with trunk and Snappy 1.1.1.7.

      To reproduce:
      1. Start 3 brokers
      2. Create a topic with 3 partitions and 3 replicas each.
      2. Start performance producer with --new-producer --compression-codec 2 (and set the number of messages to fairly high, to give you time. I went with 10M)
      3. Bounce one of the brokers
      4. The log of one of the surviving nodes should contain errors like:

      2015-07-02 13:45:59,300 ERROR kafka.server.ReplicaManager: [Replica Manager on Broker 66]: Error processing append operation on partition [t3,0]
      kafka.common.KafkaException:
              at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94)
              at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64)
              at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
              at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
              at kafka.message.ByteBufferMessageSet$$anon$2.innerDone(ByteBufferMessageSet.scala:177)
              at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:218)
              at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173)
              at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
              at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
              at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
              at scala.collection.Iterator$class.foreach(Iterator.scala:727)
              at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
              at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
              at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
              at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
              at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
              at scala.collection.AbstractIterator.to(Iterator.scala:1157)
              at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
              at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
              at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267)
              at kafka.log.Log.liftedTree1$1(Log.scala:327)
              at kafka.log.Log.append(Log.scala:326)
              at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:423)
              at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:409)
              at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
              at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268)
              at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:409)
              at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:365)
              at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:350)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
              at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
              at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
              at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
              at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
              at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
              at scala.collection.AbstractTraversable.map(Traversable.scala:105)
              at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:350)
              at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:286)
              at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:270)
              at kafka.server.KafkaApis.handle(KafkaApis.scala:57)
              at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
              at java.lang.Thread.run(Thread.java:745)
      Caused by: java.io.IOException: PARSING_ERROR(2)
              at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84)
              at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
              at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:594)
              at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:358)
              at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:167)
              at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:150)
              at java.io.DataInputStream.readFully(DataInputStream.java:195)
              at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
              ... 43 more
      

      The client has the following messages:

      [2015-07-02 13:46:00,478] ERROR Error when sending message to topic t3 with key: 4 bytes, value: 100 bytes with error: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
      java: target/snappy-1.1.1/snappy.cc:423: char* snappy::internal::CompressFragment(const char*, size_t, char*, snappy::uint16*, int): Assertion `0 == memcmp(base, candidate, matched)' failed.
      

        Activity

        Hide
        Gwen Shapira added a comment -

        I kinda know why it happens, a patch will take a bit of time since I can't reproduce the error in a unit test (although it reproduces nicely in a test environment). I'll put a preliminary patch without unit tests up in a sec, so people suffering from this issue can validate. Here's what I found:

        When we get a retriable error in the producer (NETWORK_EXCEPTION for instance), the current record batch gets put first in its topic-partition message batch queue by completeBatch().
        Next time Sender runs, it drains the queue and one of the things it does is to take the first batch from the queue and close() it. But if a batch was re-queued, it was already closed. Calling close() twice should be safe, and for un-compressed messages, it is. However, for compressed messages the logic in close() is rather complex, and I believe closing a batch twice messes up the record. I can't tell exactly where the close() logic becomes unsafe, but there's really no need to close a batch twice. MemoryRecords.close() can check if it is writable before closing, and only close() the record if it is writable. This guarantees closing will happen just once.

        Fixing this resolved the problem on my system.

        Show
        Gwen Shapira added a comment - I kinda know why it happens, a patch will take a bit of time since I can't reproduce the error in a unit test (although it reproduces nicely in a test environment). I'll put a preliminary patch without unit tests up in a sec, so people suffering from this issue can validate. Here's what I found: When we get a retriable error in the producer (NETWORK_EXCEPTION for instance), the current record batch gets put first in its topic-partition message batch queue by completeBatch(). Next time Sender runs, it drains the queue and one of the things it does is to take the first batch from the queue and close() it. But if a batch was re-queued, it was already closed. Calling close() twice should be safe, and for un-compressed messages, it is. However, for compressed messages the logic in close() is rather complex, and I believe closing a batch twice messes up the record. I can't tell exactly where the close() logic becomes unsafe, but there's really no need to close a batch twice. MemoryRecords.close() can check if it is writable before closing, and only close() the record if it is writable. This guarantees closing will happen just once. Fixing this resolved the problem on my system.
        Hide
        Guozhang Wang added a comment -

        Hi Gwen Shapira thanks for the findings. Yes for compressed message set the close call will trigger Compressor.close() which:

        1. close the compression input stream, which will likely write the left-over cached bytes to the underlying buffer.
        2. set the wrapper message header accordingly, such as offset (as the number of compressed messages - 1), length, and crc.

        For the second step I think it should be OK to execute twice, plus if not then gzip should also have the similar issue; but it seems for the first step calling stream.close() multiple times on snappy may be problematic. To verify that we can write some simple test code:

        stream = new org.xerial.snappy.SnappyOutputStream(buffer-size);
        // write some bytes to stream
        stream.close();
        stream.close(); // again
        
        Show
        Guozhang Wang added a comment - Hi Gwen Shapira thanks for the findings. Yes for compressed message set the close call will trigger Compressor.close() which: 1. close the compression input stream, which will likely write the left-over cached bytes to the underlying buffer. 2. set the wrapper message header accordingly, such as offset (as the number of compressed messages - 1), length, and crc. For the second step I think it should be OK to execute twice, plus if not then gzip should also have the similar issue; but it seems for the first step calling stream.close() multiple times on snappy may be problematic. To verify that we can write some simple test code: stream = new org.xerial.snappy.SnappyOutputStream(buffer-size); // write some bytes to stream stream.close(); stream.close(); // again
        Hide
        Gwen Shapira added a comment - - edited

        Thanks for your comments, Guozhang Wang.

        Closing multiple times on Snappy is not an issue. Actually, even draining, requeueing and draining again is not an issue by itself.
        That is, I'm still unable to create a test that replicates this error, even though it reproduces nicely in a real cluster with the performance producer.

        I have a patch that I'm fairly certain fixes the problem (although I cannot say why). I'll attach it here, because someone may need it, and continue digging into when and why does double-close corrupt messages.

        Here's a link to a test that should have caused an issue, but doesn't:
        https://gist.github.com/gwenshap/1ec9cb55d704a82477d8

        Show
        Gwen Shapira added a comment - - edited Thanks for your comments, Guozhang Wang . Closing multiple times on Snappy is not an issue. Actually, even draining, requeueing and draining again is not an issue by itself. That is, I'm still unable to create a test that replicates this error, even though it reproduces nicely in a real cluster with the performance producer. I have a patch that I'm fairly certain fixes the problem (although I cannot say why). I'll attach it here, because someone may need it, and continue digging into when and why does double-close corrupt messages. Here's a link to a test that should have caused an issue, but doesn't: https://gist.github.com/gwenshap/1ec9cb55d704a82477d8
        Hide
        Gwen Shapira added a comment -

        Created reviewboard https://reviews.apache.org/r/36290/diff/
        against branch trunk

        Show
        Gwen Shapira added a comment - Created reviewboard https://reviews.apache.org/r/36290/diff/ against branch trunk
        Hide
        Gwen Shapira added a comment -

        Actually, looks likely that it is Snappy (even though I can't reproduce):
        https://github.com/xerial/snappy-java/pull/108

        Note that this is not in 1.1.1.7 (which we are using).

        I suggest pushing our simple work-around (since its simple and nothing bad can happen from only closing once).

        Show
        Gwen Shapira added a comment - Actually, looks likely that it is Snappy (even though I can't reproduce): https://github.com/xerial/snappy-java/pull/108 Note that this is not in 1.1.1.7 (which we are using). I suggest pushing our simple work-around (since its simple and nothing bad can happen from only closing once).
        Hide
        Ewen Cheslack-Postava added a comment -

        Gwen Shapira The test case you gave doesn't quite do enough to trigger the bug. It releases the same buffer twice, but doesn't reuse it. I think you'd need to get the test to do something more like:

        • Fill first record batch (batch 1) with records and drain (causing buffer to be released).
        • At least start creating another batch (batch 2). This allocates the buffer to that batch.
        • Reenqueue batch 1 and drain (causing buffer to be released second time).
        • Continue enqueuing until it creates another batch (batch 3), which allocates the buffer yet again.
        • Drain batches 2 and 3 and validate their contents.
        Show
        Ewen Cheslack-Postava added a comment - Gwen Shapira The test case you gave doesn't quite do enough to trigger the bug. It releases the same buffer twice, but doesn't reuse it. I think you'd need to get the test to do something more like: Fill first record batch (batch 1) with records and drain (causing buffer to be released). At least start creating another batch (batch 2). This allocates the buffer to that batch. Reenqueue batch 1 and drain (causing buffer to be released second time). Continue enqueuing until it creates another batch (batch 3), which allocates the buffer yet again. Drain batches 2 and 3 and validate their contents.
        Hide
        Gwen Shapira added a comment -

        yes, I saw the Snappy test case too

        Since its a confirmed Snappy bug, I don't think we need a Kafka test-case. We can just protect that call, right?

        Show
        Gwen Shapira added a comment - yes, I saw the Snappy test case too Since its a confirmed Snappy bug, I don't think we need a Kafka test-case. We can just protect that call, right?
        Hide
        Guozhang Wang added a comment -

        Agree, we do not need a test case inside Kafka code.

        Show
        Guozhang Wang added a comment - Agree, we do not need a test case inside Kafka code.
        Hide
        Gwen Shapira added a comment -

        Was that a "ship it", Guozhang Wang?

        Show
        Gwen Shapira added a comment - Was that a "ship it", Guozhang Wang ?
        Hide
        Guozhang Wang added a comment -

        I was unit testing the patch while writing the last comment Just shipped it and committed to trunk.

        Show
        Guozhang Wang added a comment - I was unit testing the patch while writing the last comment Just shipped it and committed to trunk.
        Hide
        Gwen Shapira added a comment -

        Thanks

        Show
        Gwen Shapira added a comment - Thanks
        Hide
        Allen Wang added a comment -

        Gwen Shapira Guozhang Wang Is the fix in producer only? If I take 0.8.2.2 producer, do I also need to have broker/consumer upgraded to 0.8.2.2 or a later snappy version in order to avoid this bug? Currently our broker is on 0.8.2.1 and snappy 1.1.1.6.

        Show
        Allen Wang added a comment - Gwen Shapira Guozhang Wang Is the fix in producer only? If I take 0.8.2.2 producer, do I also need to have broker/consumer upgraded to 0.8.2.2 or a later snappy version in order to avoid this bug? Currently our broker is on 0.8.2.1 and snappy 1.1.1.6.
        Hide
        Guozhang Wang added a comment -

        Allen Wang This bug should be only in the producer due to its use patterns of snappy.

        Show
        Guozhang Wang added a comment - Allen Wang This bug should be only in the producer due to its use patterns of snappy.
        Hide
        ASF GitHub Bot added a comment -

        Github user darionyaphet commented on the pull request:

        https://github.com/apache/storm/pull/801#issuecomment-161509467

        Hi @knusbaum @revans2 I read `Kafka Release Notes Version 0.8.2.2` and found a bug fixed (KAFKA-2308(https://issues.apache.org/jira/browse/KAFKA-2308)) about New producer and Snappy un-compression errors when Kafka Broker restart . So I think this is maybe useful .

        Show
        ASF GitHub Bot added a comment - Github user darionyaphet commented on the pull request: https://github.com/apache/storm/pull/801#issuecomment-161509467 Hi @knusbaum @revans2 I read `Kafka Release Notes Version 0.8.2.2` and found a bug fixed ( KAFKA-2308 ( https://issues.apache.org/jira/browse/KAFKA-2308 )) about New producer and Snappy un-compression errors when Kafka Broker restart . So I think this is maybe useful .

          People

          • Assignee:
            Gwen Shapira
            Reporter:
            Gwen Shapira
          • Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development