Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9203

kafka-client 2.3.1 fails to consume lz4 compressed topic

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.3.0, 2.3.1
    • 2.4.0, 2.3.2
    • compression, consumer
    • None

    Description

      I run kafka cluster 2.1.1

      when I upgraded the consumer app to use kafka-client 2.3.0 (or 2.3.1) instead of 2.2.0, I immediately started getting the following exceptions in a loop when consuming a topic with LZ4-compressed messages:

      2019-11-18 11:59:16.888 ERROR [afka-consumer-thread]     com.avast.utils2.kafka.consumer.ConsumerThread: Unexpected exception occurred while polling and processing messages: org.apache.kafka.common.KafkaExce
      ption: Received exception when fetching the next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue consumption. 
      org.apache.kafka.common.KafkaException: Received exception when fetching the next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue consumption. 
              at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473) 
              at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) 
              at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) 
              at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) 
              at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) 
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) 
              at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
              at com.avast.utils2.kafka.consumer.ConsumerThread.pollAndProcessMessagesFromKafka(ConsumerThread.java:180) 
              at com.avast.utils2.kafka.consumer.ConsumerThread.run(ConsumerThread.java:149) 
              at com.avast.filerep.saver.RequestSaver$.$anonfun$main$8(RequestSaver.scala:28) 
              at com.avast.filerep.saver.RequestSaver$.$anonfun$main$8$adapted(RequestSaver.scala:19) 
              at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) 
              at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
              at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
              at scala.util.control.Exception$Catch.either(Exception.scala:252) 
              at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
              at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
              at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
              at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
              at resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) 
              at resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) 
              at resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) 
              at resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) 
              at resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) 
              at resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
              at com.avast.filerep.saver.RequestSaver$.$anonfun$main$6(RequestSaver.scala:19) 
              at com.avast.filerep.saver.RequestSaver$.$anonfun$main$6$adapted(RequestSaver.scala:18) 
              at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) 
              at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
              at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
              at scala.util.control.Exception$Catch.either(Exception.scala:252) 
              at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
              at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
              at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
              at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
              at resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) 
              at resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) 
              at resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) 
              at resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) 
              at resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) 
              at resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
              at com.avast.filerep.saver.RequestSaver$.$anonfun$main$4(RequestSaver.scala:18) 
              at com.avast.filerep.saver.RequestSaver$.$anonfun$main$4$adapted(RequestSaver.scala:17) 
              at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) 
              at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
              at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
              at scala.util.control.Exception$Catch.either(Exception.scala:252) 
              at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
              at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
              at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
              at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
              at resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) 
              at resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) 
              at resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) 
              at resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) 
              at resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) 
              at resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
              at com.avast.filerep.saver.RequestSaver$.$anonfun$main$2(RequestSaver.scala:17) 
              at com.avast.filerep.saver.RequestSaver$.$anonfun$main$2$adapted(RequestSaver.scala:16) 
              at resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) 
              at scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
              at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
              at scala.util.control.Exception$Catch.either(Exception.scala:252) 
              at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
              at resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
              at resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
              at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
              at resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) 
              at resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) 
              at resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) 
              at resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) 
              at resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) 
              at resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
              at com.avast.filerep.saver.RequestSaver$.main(RequestSaver.scala:16) 
              at com.avast.filerep.saver.RequestSaver.main(RequestSaver.scala) 
      Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Stream frame descriptor corrupted 
              at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113) 
              at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:257) 
              at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:335) 
              at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1450) 
              at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1487) 
              at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) 
              at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) 
              at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) 
              at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294) 
              ... 70 common frames omitted 
      Caused by: java.io.IOException: Stream frame descriptor corrupted 
              at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132) 
              at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78) 
              at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110) 
              ... 78 common frames omitted
      
      

       (the producer app is using kafka client 0.10.2.1)

      I retried with a new consumer group but it didn't help. Kafka-client downgrade back to 2.2.0 helped. This makes me think LZ4 may be broken in kafka-client 2.3.x

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            ijuma Ismael Juma
            dwatzke David Watzke
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment