Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13483

Stale Missing ISR on a partition after a zookeeper timeout

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.8.1
    • 3.1.0
    • replication
    • None

    Description

      We hit a situation where we had a Stale Missing ISR on a single partition on an output changelog topic after a "broker to zookeeper" connection timed out in our production system, This ticket shows the logs of what happened and a workaround that got us out of this situation.

       

      Cluster config

      7 Kafka Brokers v2.8.1  (k8s bitnami)

      3 Zookeeper v3.6.2 (k8s bitnami)

      kubernetes v1.20.6

       

      Processing pattern:

      source topic 
           -> KStream application: update 40 stores backed by 
                 -> data-##-changelog topics 

       

      All topics have 10 partitions, 3 replicas, min.isr 2

      After a broker to zookeeper connection timeed out (see logs below) , lots of topic's partitions ISR went missing.
      Almost all partition recovered a few milliseconds later, as the reconnection to zk re-established.

      Except for partition number 3 of one of the 40 data-##-changelog topics
      It stayed overnight under-replicated, preventing any progress to be done from the source topic's partition 3 of the kstream app. At the same time halting production of data for the 39 other changelog topic on partition 3 (because they also reply on partition 3 of the input topic)

      Successfull Workaround
      We ran kafka-reassign-partitions.sh on that partition, with the exact same replicas config, and the ISR came back normal, in a matter of milliseconds.

      kafka-reassign-partitions.sh --bootstrap-server kafka.kafka.svc.cluster.local:9092 --reassignment-json-file ./replicas-data-23-changlog.json 

      where replicas-data-23-changlog.json  contains that original ISR config

      {"version":1,"partitions":[{"topic":"data-23-changelog","partition":3,"replicas":[1007,1005,1003]}]} 

       

      Questions:
      Would you be able to provide an explanation why that specific partition did not recover like the others after the zk timeout  failure?

      Or could it be a bug?

      We are glad the workaround worked, but is there an explanation why it did?

      Otherwise what should have been done to address this issue?

       

      Observed summary of the logs

       

      [2021-11-20 20:21:42,577] WARN Client session timed out, have not heard from server in 26677ms for sessionid 0x2000086f5260006 (org.apache.zookeeper.ClientCnxn)
      [2021-11-20 20:21:42,582] INFO Client session timed out, have not heard from server in 26677ms for sessionid 0x2000086f5260006, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
      [2021-11-20 20:21:44,644] INFO Opening socket connection to server zookeeper.kafka.svc.cluster.local
      [2021-11-20 20:21:44,646] INFO Socket connection established, initiating session, client: , server: zookeeper.kafka.svc.cluster.local (org.apache.zookeeper.ClientCnxn)
      [2021-11-20 20:21:44,649] INFO Session establishment complete on server zookeeper.kafka.svc.cluster.local, sessionid = 0x2000086f5260006, negotiated timeout = 40000 (org.apache.zookeeper.ClientCnxn)
       
      [2021-11-20 20:21:57,133] INFO [ReplicaFetcher replicaId=1007, leaderId=1001, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
      [2021-11-20 20:21:57,137] INFO [ReplicaFetcher replicaId=1007, leaderId=1001, fetcherId=0] Error sending fetch request (sessionId=1896541533, epoch=50199) to node 1001: (org.apache.kafka.clients.FetchSessionHandler)
      java.io.IOException: Client was shutdown before response was read
              at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:109)
              at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
              at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:217)
              at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:325)
              at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:141)
              at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:140)
              at scala.Option.foreach(Option.scala:407)
              at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:140)
              at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:123)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
      [2021-11-20 20:21:57,141] INFO [ReplicaFetcher replicaId=1007, leaderId=1001, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
      [2021-11-20 20:21:57,141] INFO [ReplicaFetcher replicaId=1007, leaderId=1001, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
      [2021-11-20 20:21:57,145] INFO [ReplicaFetcher replicaId=1007, leaderId=1005, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
      [2021-11-20 20:21:57,145] INFO [ReplicaFetcher replicaId=1007, leaderId=1005, fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
      [2021-11-20 20:21:57,146] INFO [ReplicaFetcher replicaId=1007, leaderId=1005, fetcherId=0] Shutdown completed (kafka.server.ReplicaFetcherThread)
      [2021-11-20 20:22:24,577] WARN [Partition data-23-changelog-3 broker=1007] Failed to update ISR to PendingExpandIsr(isr=Set(1007), newInSyncReplicaId=1003) due to unexpected UNKNOWN_SERVER_ERROR. Retrying. (kafka.cluster.Partition)
      [2021-11-20 20:22:24,578] ERROR [broker-1007-to-controller] Uncaught error in request completion: (org.apache.kafka.clients.NetworkClient)
      java.lang.IllegalStateException: Failed to enqueue ISR change state LeaderAndIsr(leader=1007, leaderEpoch=2, isr=List(1007, 1003), zkVersion=2) for partition data-23-changelog-3
              at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1379)
              at kafka.cluster.Partition.$anonfun$handleAlterIsrResponse$1(Partition.scala:1413)
              at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1392)
              at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1370)
              at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1370)
              at kafka.server.DefaultAlterIsrManager.$anonfun$handleAlterIsrResponse$8(AlterIsrManager.scala:262)
              at kafka.server.DefaultAlterIsrManager.$anonfun$handleAlterIsrResponse$8$adapted(AlterIsrManager.scala:259)
              at scala.collection.immutable.List.foreach(List.scala:431)
              at kafka.server.DefaultAlterIsrManager.handleAlterIsrResponse(AlterIsrManager.scala:259)
              at kafka.server.DefaultAlterIsrManager$$anon$1.onComplete(AlterIsrManager.scala:179)
              at kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManager.scala:362)
              at kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManager.scala:333)
              at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
              at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
              at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
              at kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:74)
              at kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

       At this point we ran kafka-reassign-partitions.sh on that partition, with the exact same config, and it fixed it.

      kafka-reassign-partitions.sh --bootstrap-server kafka.kafka.svc.cluster.local:9092 --reassignment-json-file ./replicas-data-23-changlog.json 
      
      where replicas-data-23-changlog.json  contains
      
      {"version":1,"partitions":[{"topic":"data-23-changelog","partition":3,"replicas":[1007,1005,1003]}]} 

       

       

      Log showing ISR is back to normal:

      [2021-11-21 12:36:36,727] INFO [Admin Manager on Broker 1007]: Updating broker 1007 with new configuration :  (kafka.server.ZkAdminManager)
      [2021-11-21 12:36:36,747] INFO Processing notification(s) to /config/changes (kafka.common.ZkNodeChangeNotificationListener)
      [2021-11-21 12:36:36,749] INFO Processing override for entityPath: brokers/1005 with config: Map() (kafka.server.DynamicConfigManager)
      [2021-11-21 12:36:36,752] INFO Processing override for entityPath: brokers/1007 with config: Map() (kafka.server.DynamicConfigManager)
      ...
      [2021-11-21 12:37:19,435] INFO [Partition data-23-changelog-3 broker=1007] ISR updated to 1007,1005 and version updated to [4] (kafka.cluster.Partition)
      [2021-11-21 12:37:19,928] INFO [Partition data-23-changelog-3 broker=1007] ISR updated to 1007,1005,1003 and version updated to [5] (kafka.cluster.Partition) 

       

       

      Thank you kindly for any comments and suggestions!

      Francois

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              fmethot F Méthot
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: