Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-9092

Kafka Streams Upgrade Magic v0 does not support record headers

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.0.0, 2.0.0
    • Fix Version/s: None
    • Component/s: streams
    • Labels:
      None
    • Environment:
      kafka 1.0.0
      kafka streams lib 2.0.0

      Description

      My kafka cluster version: 1.0.0 and run a streams app to topic v1.

       

      Then I upgrade kafka streams lib to 2.0.0 and want to use some new apis.

      Reference Upgrade doc: http://kafka.apache.org/20/documentation/streams/upgrade-guide

       

      <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-streams</artifactId>
       <version>2.0.0</version>
      </dependency>

       

      // streams config
      settings.put(StreamsConfig.UPGRADE_FROM_CONFIG, "1.0");

       

      Then stop streams app and rebuild a new jar to start.

      It’s no problem just starting to run. After a few hours kafka broker logs error and I delete topic recreate a few hours kafka broker get same error:

      [2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]} (kafka.server.KafkaApis)[2019-10-24 10:15:45,780] ERROR [KafkaApi-109] Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,topics=[{topic=xxx,partitions=[{partition=4,fetch_offset=3165633,max_bytes=1048576}]}]} (kafka.server.KafkaApis)java.lang.IllegalArgumentException: Magic v0 does not support record headers at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:403) at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:586) at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:134) at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:109) at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518) at scala.Option.map(Option.scala:146) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508) at scala.Option.flatMap(Option.scala:171) at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569) at kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034) at kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52) at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033) at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588) at kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175) at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback$1(KafkaApis.scala:587) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) at kafka.server.KafkaApis$$anonfun$handleFetchRequest$3.apply(KafkaApis.scala:604) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:820) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:596) at kafka.server.KafkaApis.handle(KafkaApis.scala:100) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65) at java.lang.Thread.run(Thread.java:744)

       

      I found this issue:https://issues.apache.org/jira/browse/KAFKA-6739?src=confmacro

      It is a bug for kafka 1.0.0?

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                wxmimperio wxmimperio
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: