Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-8374

KafkaApis.handleLeaderAndIsrRequest not robust to ZooKeeper exceptions

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.0.1
    • Fix Version/s: None
    • Component/s: core, offset manager
    • Labels:
      None
    • Environment:
      Linux x86_64 (Ubuntu Xenial) running on AWS EC2

      Description

      Summary of bug (theory)

      During a leader election, when a broker is transitioning from leader to follower on some __consumer_offset partitions and some __transaction_state partitions, it’s possible for a ZooKeeper exception to be thrown, leaving the GroupMetadataManager in an inconsistent state.

       
      In particular, in KafkaApis.handleLeaderAndIsrRequest in the onLeadershipChange callback, it’s possible for TransactionCoordinator.handleTxnEmigration to throw ZooKeeperClientExpiredException, ending the updatedFollowers.foreach loop early. If there were any __consumer_offset partitions to be handled later in the loop, GroupMetadataManager will be left with stale data in its groupMetadataCache. Later, when this broker resumes leadership for the affected __consumer_offset partitions, it will fail to load the updated groups into the cache since it uses putIfNotExists, and it will serve stale offsets to consumers.
       

      Details of what we experienced

      We ran into this issue running Kafka 2.0.1 in production. Several Kafka consumers received stale offsets when reconnecting to their group coordinator after a leadership election on their __consumer_offsets partition. This caused them to reprocess many duplicate messages.
       
      We believe we’ve tracked down the root cause: * On 2019-04-01, we were having memory pressure in ZooKeeper, and we were getting several ZooKeeperClientExpiredException errors in the logs.

      • The impacted consumers were all in __consumer_offsets-15. There was a leader election on this partition, and leadership transitioned from broker 1088 to broker 1069. During this leadership election, the former leader (1088) saw a ZooKeeperClientExpiredException  (stack trace below). This happened inside KafkaApis.handleLeaderAndIsrRequest, specifically in onLeadershipChange while it was updating a __transaction_state partition. Since there are no “Scheduling unloading” or “Finished unloading” log messages in this period, we believe it threw this exception before getting to __consumer_offsets-15, so it never got a chance to call GroupCoordinator.handleGroupEmigration, which means this broker didn’t unload offsets from its GroupMetadataManager.
      • Four days later, on 2019-04-05, we manually restarted broker 1069, so broker 1088 became the leader of __consumer_offsets-15 again. When it ran GroupMetadataManager.loadGroup, it presumably failed to update groupMetadataCache since it uses putIfNotExists, and it would have found the group id already in the cache. Unfortunately we did not have debug logging enabled, but I would expect to have seen a log message like "Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed because there is already a cached group with generation ${currentGroup.generationId}".
      • After the leadership election, the impacted consumers reconnected to broker 1088 and received stale offsets that correspond to the last committed offsets around 2019-04-01.

       

      Relevant log/stacktrace

      [2019-04-01 22:44:18.968617] [2019-04-01 22:44:18,963] ERROR [KafkaApi-1088] Error when handling request {controller_id=1096,controller_epoch=122,partition_states=[...,{topic=__consumer_offsets,partition=15,controller_epoch=122,leader=1069,leader_epoch=440,isr=[1092,1088,1069],zk_version=807,replicas=[1069,1088,1092],is_new=false},...],live_leaders=[{id=1069,host=10.68.42.121,port=9094}]} (kafka.server.KafkaApis)
      [2019-04-01 22:44:18.968689] kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either before or while waiting for connection
      [2019-04-01 22:44:18.968712]         at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:238)
      [2019-04-01 22:44:18.968736]         at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
      [2019-04-01 22:44:18.968759]         at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
      [2019-04-01 22:44:18.968776]         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
      [2019-04-01 22:44:18.968804]         at kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:226)
      [2019-04-01 22:44:18.968836]         at kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:220)
      [2019-04-01 22:44:18.968863]         at kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
      [2019-04-01 22:44:18.968891]         at kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
      [2019-04-01 22:44:18.968941]         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
      [2019-04-01 22:44:18.968972]         at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:219)
      [2019-04-01 22:44:18.968997]         at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1510)
      [2019-04-01 22:44:18.969020]         at kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:463)
      [2019-04-01 22:44:18.969062]         at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:514)
      [2019-04-01 22:44:18.969118]         at kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:280)
      [2019-04-01 22:44:18.969168]         at kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:466)
      [2019-04-01 22:44:18.969206]         at kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:435)
      [2019-04-01 22:44:18.969239]         at kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
      [2019-04-01 22:44:18.969266]         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:180)
      [2019-04-01 22:44:18.969293]         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:176)
      [2019-04-01 22:44:18.969316]         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
      [2019-04-01 22:44:18.969341]         at kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:176)
      [2019-04-01 22:44:18.969361]         at kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185)
      [2019-04-01 22:44:18.969383]         at kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185)
      [2019-04-01 22:44:18.969412]         at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1117)
      [2019-04-01 22:44:18.969435]         at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185)
      [2019-04-01 22:44:18.969454]         at kafka.server.KafkaApis.handle(KafkaApis.scala:110)
      [2019-04-01 22:44:18.969476]         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
      [2019-04-01 22:44:18.969513]         at java.lang.Thread.run(Thread.java:748)
      

       

        Attachments

          Activity

            People

            • Assignee:
              bob-barrett Bob Barrett
              Reporter:
              mikemintz Mike Mintz
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated: