Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-7403

Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 2.1.0
    • Component/s: core
    • Labels:
      None

      Description

      I am currently trying broker upgrade from 0.11 to 2.0 with some patches including KIP-211/KAFKA-4682. After the upgrade, however, applications with 0.10.2 Kafka clients failed with the following error:

      2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. org.apache.kafka.common.KafkaException: Unexpected error in commit: The server experienced an unexpected error when processing the request at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) ~[kafka-clients-0.10.2.86.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) ~[kafka-clients-0.10.2.86.jar:?]
      

      From my reading of the code, it looks like the following happened:

      1. The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME.
      2. In the 2.0 broker code, upon receiving an OffsetCommitRequest with DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the "expireTimestamp" field of OffsetAndMetadata to None.
      3. Later in the code path, GroupMetadataManager.offsetCommitValue() expects OffsetAndMetadata to have a non-empty "expireTimestamp" field if the inter.broker.protocol.version is < KAFKA_2_1_IV0.
      4. However, the inter.broker.protocol.version was set to "1.0" prior to the upgrade, and as a result, the following code in offsetCommitValue() raises an error because expireTimestamp is None:
        value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get)

       

      Here is the stack trace for the broker side error

      java.util.NoSuchElementException: None.get
      at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?]
      at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?]
      at kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:324) ~[kafka_2.11-2.0.0.10.jar:?]
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?]
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?]
      at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) ~[scala-library-2.11.12.jar:?]
      at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) ~[scala-library-2.11.12.jar:?]
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) ~[scala-library-2.11.12.jar:?]
      at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[scala-library-2.11.12.jar:?]
      at kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:324) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply$mcV$sp(GroupCoordinator.scala:521) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.coordinator.group.GroupCoordinator$$anonfun$doCommitOffsets$1.apply(GroupCoordinator.scala:506) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:193) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:505) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:484) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:359) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.server.KafkaApis.handle(KafkaApis.scala:114) ~[kafka_2.11-2.0.0.10.jar:?]
      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) ~[kafka_2.11-2.0.0.10.jar:?]
      at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
      

       

      And I was able to reproduce the error by passing KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala.

       

      Vahid Hashemian, the error was from the code added for KAFKA-4682. Can you take a look if this is indeed an issue? 

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                vahid Vahid Hashemian
                Reporter:
                jonlee2 Jon Lee
              • Votes:
                0 Vote for this issue
                Watchers:
                6 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: