Kafka
  1. Kafka
  2. KAFKA-273

Occassional GZIP errors on the server while writing compressed data to disk

    Details

    • Type: Bug Bug
    • Status: Patch Available
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.7
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None

      Description

      Occasionally, we see the following errors on the Kafka server -

      2012/02/08 14:58:21.832 ERROR [KafkaRequestHandlers] [kafka-processor-6] [kafka] Error processing MultiProducerRequest on NusImpressionSetEvent:0
      java.io.EOFException: Unexpected end of ZLIB input stream
      at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:223)
      at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:141)
      at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:92)
      at java.io.FilterInputStream.read(FilterInputStream.java:90)
      at kafka.message.GZIPCompression.read(CompressionUtils.scala:52)
      at kafka.message.CompressionUtils$$anonfun$decompress$1.apply$mcI$sp(CompressionUtils.scala:143)
      at kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143)
      at kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143)
      at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598)
      at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598)
      at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555)
      at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549)
      at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394)
      at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394)
      at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555)
      at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549)
      at scala.collection.immutable.Stream.foreach(Stream.scala:255)
      at kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:143)
      at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:119)
      at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:132)
      at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:81)
      at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
      at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
      at kafka.message.MessageSet.foreach(MessageSet.scala:87)
      at kafka.log.Log.append(Log.scala:204)
      at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:70)
      at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63)
      at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
      at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
      at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:63)
      at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42)
      at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42)
      at kafka.network.Processor.handle(SocketServer.scala:297)
      at kafka.network.Processor.read(SocketServer.scala:320)
      at kafka.network.Processor.run(SocketServer.scala:215)
      at java.lang.Thread.run(Thread.java:619)

      1. kafka-273.patch
        8 kB
        Neha Narkhede

        Issue Links

          Activity

          Neha Narkhede created issue -
          Neha Narkhede made changes -
          Field Original Value New Value
          Link This issue incorporates KAFKA-274 [ KAFKA-274 ]
          Neha Narkhede made changes -
          Link This issue incorporates KAFKA-275 [ KAFKA-275 ]
          Hide
          Neha Narkhede added a comment -

          We've seen this error very occasionally, but in our deployments the Deflator uses jdk-1.6.0.21 and zlib-1.2.3 on Linux and jdk-1.6.0_16 and zlib-1.2.3 on Solaris. And the Inflator uses jdk-1.6.0.21 and zlib-1.2.3 on Linux.

          According to this Java bug - http://bugs.sun.com/bugdatabase/view_bug.do;jsessionid=e8f7802ea035813254fc6aba9bf0?bug_id=6519463, the bug is fixed by using a combination of zlib1.23 and jdk7-b72

          Here is the code snippet from InflatorInputStream.java -
          157 if (inf.needsInput())

          { 158 fill(); 159 }

          This bug occurs when the native Inflator on the platform indicates there are more bytes to decompress, when there aren't any. So, the InflatorInputStream.read() calls fill() based on that, where it throws EOFException().

          The workaround seems to be catching the EOFException in CompressionUtils.decompress and do nothing.

          Show
          Neha Narkhede added a comment - We've seen this error very occasionally, but in our deployments the Deflator uses jdk-1.6.0.21 and zlib-1.2.3 on Linux and jdk-1.6.0_16 and zlib-1.2.3 on Solaris. And the Inflator uses jdk-1.6.0.21 and zlib-1.2.3 on Linux. According to this Java bug - http://bugs.sun.com/bugdatabase/view_bug.do;jsessionid=e8f7802ea035813254fc6aba9bf0?bug_id=6519463 , the bug is fixed by using a combination of zlib1.23 and jdk7-b72 Here is the code snippet from InflatorInputStream.java - 157 if (inf.needsInput()) { 158 fill(); 159 } This bug occurs when the native Inflator on the platform indicates there are more bytes to decompress, when there aren't any. So, the InflatorInputStream.read() calls fill() based on that, where it throws EOFException(). The workaround seems to be catching the EOFException in CompressionUtils.decompress and do nothing.
          Hide
          Jun Rao added a comment -

          We have also seem the following gzip issue. Is that the same issue since it's not triggered by EOF?

          ERROR [CompressionUtils$] [kafka-processor-0] [kafka] Error while reading from the GZIP input stream
          java.io.IOException: Corrupt GZIP trailer
          at java.util.zip.GZIPInputStream.readTrailer(GZIPInputStream.java:182)
          at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:94)
          at java.io.FilterInputStream.read(FilterInputStream.java:90)
          at kafka.message.GZIPCompression.read(CompressionUtils.scala:52)
          at kafka.message.CompressionUtils$$anonfun$decompress$1.apply$mcI$sp(CompressionUtils.scala:143)
          at kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143)
          at kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143)
          at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598)
          at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598)
          at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555)
          at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549)
          at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394)
          at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394)
          at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555)
          at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549)
          at scala.collection.immutable.Stream.foreach(Stream.scala:255)
          at kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:143)
          at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:119)
          at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:132)
          at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:81)
          at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
          at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
          at scala.collection.Iterator$class.foreach(Iterator.scala:631)
          at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
          at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
          at kafka.message.MessageSet.foreach(MessageSet.scala:87)
          at kafka.log.Log.append(Log.scala:204)
          at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:70)
          at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63)
          at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
          at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
          at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
          at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
          at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:63)
          at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42)
          at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42)
          at kafka.network.Processor.handle(SocketServer.scala:297)
          at kafka.network.Processor.read(SocketServer.scala:320)
          at kafka.network.Processor.run(SocketServer.scala:215)
          at java.lang.Thread.run(Thread.java:619)

          Show
          Jun Rao added a comment - We have also seem the following gzip issue. Is that the same issue since it's not triggered by EOF? ERROR [CompressionUtils$] [kafka-processor-0] [kafka] Error while reading from the GZIP input stream java.io.IOException: Corrupt GZIP trailer at java.util.zip.GZIPInputStream.readTrailer(GZIPInputStream.java:182) at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:94) at java.io.FilterInputStream.read(FilterInputStream.java:90) at kafka.message.GZIPCompression.read(CompressionUtils.scala:52) at kafka.message.CompressionUtils$$anonfun$decompress$1.apply$mcI$sp(CompressionUtils.scala:143) at kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143) at kafka.message.CompressionUtils$$anonfun$decompress$1.apply(CompressionUtils.scala:143) at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:598) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) at scala.collection.immutable.Stream$$anonfun$takeWhile$1.apply(Stream.scala:394) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:555) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:549) at scala.collection.immutable.Stream.foreach(Stream.scala:255) at kafka.message.CompressionUtils$.decompress(CompressionUtils.scala:143) at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:119) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:132) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:81) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.log.Log.append(Log.scala:204) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:70) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63) at kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:63) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:63) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:42) at kafka.network.Processor.handle(SocketServer.scala:297) at kafka.network.Processor.read(SocketServer.scala:320) at kafka.network.Processor.run(SocketServer.scala:215) at java.lang.Thread.run(Thread.java:619)
          Neha Narkhede made changes -
          Status Open [ 1 ] In Progress [ 3 ]
          Hide
          Neha Narkhede added a comment -

          Changed CompressionUtils.decompress to handle EOFException and return -1 from the read API

          Show
          Neha Narkhede added a comment - Changed CompressionUtils.decompress to handle EOFException and return -1 from the read API
          Neha Narkhede made changes -
          Attachment kafka-273.patch [ 12517289 ]
          Neha Narkhede made changes -
          Status In Progress [ 3 ] Patch Available [ 10002 ]
          Hide
          Neha Narkhede added a comment -

          Jun, I am not sure its the same issue. See this - http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4262583

          The Java bugs mention that it happens due to very large compressed or uncompressed data (larger than 2GB). Not sure how Kafka server can get into that situation, since the request size is 100 MB.

          Show
          Neha Narkhede added a comment - Jun, I am not sure its the same issue. See this - http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4262583 The Java bugs mention that it happens due to very large compressed or uncompressed data (larger than 2GB). Not sure how Kafka server can get into that situation, since the request size is 100 MB.
          Hide
          Jun Rao added a comment -

          The patch for EOF looks fine. We probably need to do some system test to make sure this doesn't introduce new problems, especially when the compressed size is relatively large. Once that test is done. We can commit the patch.

          Show
          Jun Rao added a comment - The patch for EOF looks fine. We probably need to do some system test to make sure this doesn't introduce new problems, especially when the compressed size is relatively large. Once that test is done. We can commit the patch.
          Hide
          Neha Narkhede added a comment -

          Have run the system test with message size = 100K, batch size = 200 and compression turned on. It passed.

          Show
          Neha Narkhede added a comment - Have run the system test with message size = 100K, batch size = 200 and compression turned on. It passed.
          Hide
          Jay Kreps added a comment -

          Is this still happening?

          Show
          Jay Kreps added a comment - Is this still happening?

            People

            • Assignee:
              Neha Narkhede
              Reporter:
              Neha Narkhede
            • Votes:
              1 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:

                Development