Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13556

Kafka cluster loses the controller

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.8.1
    • None
    • core
    • None

    Description

      Hi team,

      Kafka version: 2.8.1
      Configuration: 3 kafka brokers in different availability zones and 3 zookeeper brokers in different availability zones.

      I faced with a bug when kafka cluster loses the controller and if after that restart any none controller broker then it stops processing data.

      Context:
      The kafka cluster has SASL configuration for connection.

      # Listeners config
      listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092
      advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512
      inter.broker.listener.name=SASL_SSL
      sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
      inter.broker.protocol.version=2.8-IV0
      # Keystore config
      ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks
      ssl.keystore.password=---PASSWORD---
      ssl.key.password=---PASSWORD---
      ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks
      ssl.truststore.password=---PASSWORD--- 
      
      zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181
      zookeeper.connection.timeout.ms=6000 

      Zookeeper doesn't have SASL configuration and uses connection without any authentication.

      So, when I start kafka brokers I see in logs error about auth failed but then ZK client switches to connection without authentication and all works fine.

      [2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:04:08,462] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:04:08,470] INFO Opening socket connection to server ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:04:08,478] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48022, server: ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:04:08,496] INFO Session establishment complete on server ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: Starting (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) 

      Then I restart ZK node (or it restarts somehow) which holds connection with kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next:

      [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:42,951] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:43,583] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:43,593] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48560, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:43,808] INFO Unable to read additional data from server sessionid 0x10028c111930126, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,019] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,019] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,022] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37734, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,023] INFO Unable to read additional data from server sessionid 0x10028c111930125, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,704] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,704] INFO Opening socket connection to server ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,707] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:37738, server: ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,709] INFO Session establishment complete on server ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,798] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,798] INFO Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:AuthFailed type:None path:null (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,807] INFO Socket connection established, initiating session, client: /===BROKER_2_IP===:48568, server: ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,824] INFO Session establishment complete on server ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated timeout = 18000 (org.apache.zookeeper.ClientCnxn)
      [2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received event: WatchedEvent state:SyncConnected type:None path:null (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
      [2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO [ZooKeeperClient Kafka server] Reinitializing due to auth failure. (kafka.zookeeper.ZooKeeperClient) 

      The main points here are next:
      1. The log above is from kafka controller node.
      2. Connection to ZK was lost "Received event: WatchedEvent state:Disconnected"
      3. Connection was established with live ZK nodes "Opening socket connection to server ===ZOOKEEPER_HOST_3===:2181" and "Received event: WatchedEvent state:SyncConnected"
      4. During connection process we get "Auth failed." error as it was during starting of brokers.
      5. The error "Auth failed" is catched in the code: kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process and call code:

      if (initialized)
          scheduleReinitialize("auth-failed", "Reinitializing due to auth failure.", RetryBackoffMs) 

      where RetryBackoffMs is 1000.

      In a second after "Auth failed" message we see message "Reinitializing due to auth failure." in the log.
      The method ZooKeeperClient.scheduleReinitialize calls ZooKeeperClient.reinitialize.

       

        private def reinitialize(): Unit = {
          // Initialization callbacks are invoked outside of the lock to avoid deadlock potential since their completion
          // may require additional Zookeeper requests, which will block to acquire the initialization lock
          stateChangeHandlers.values.foreach(callBeforeInitializingSession _)    inWriteLock(initializationLock) {
            if (!connectionState.isAlive) {
              zooKeeper.close()
              info(s"Initializing a new session to $connectString.")
              // retry forever until ZooKeeper can be instantiated
              var connected = false
              while (!connected) {
                try {
                  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher, clientConfig)
                  connected = true
                } catch {
                  case e: Exception =>
                    info("Error when recreating ZooKeeper, retrying after a short sleep", e)
                    Thread.sleep(RetryBackoffMs)
                }
              }
            }
          }    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
        } 

      The code inside "inWriteLock(initializationLock)" block is not running, because the connection already was establised with live ZK nodes (in logs I didn't find the message s"Initializing a new session to $connectString.").
      But the code "callBeforeInitializingSession" calls "KafkaController.startup.beforeInitializingSession" which fires "ControllerEvent.Expire" event and the code "callAfterInitializingSession" calls "KafkaController.startup.afterInitializingSession" and fires "ControllerEvent.RegisterBrokerAndReelect" event.

      The event "ControllerEvent.Expire" call "KafkaController.processExpire" method which shutting down the current controller and in logs we see next:

      [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning (kafka.controller.KafkaController)
      [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister BrokerModifications handler for Set(1, 2, 3) (kafka.controller.KafkaController)
      [2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped partition state machine (kafka.controller.ZkPartitionStateMachine)
      [2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped replica state machine (kafka.controller.ZkReplicaStateMachine)
      [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
      [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
      [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
      [2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
      [2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
      [2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
      [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting down (kafka.controller.RequestSendThread)
      [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped (kafka.controller.RequestSendThread)
      [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown completed (kafka.controller.RequestSendThread)
      [2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned (kafka.controller.KafkaController) 

      And then "ControllerEvent.RegisterBrokerAndReelect" event calls method "KafkaController.processRegisterBrokerAndReelect" => "KafkaController.elect". But in the logs there is next:

       

      [2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected as the controller, so stopping the election process. (kafka.controller.KafkaController) 

      There is a part of the method "KafkaController.elect" which write the message above

       

       

        private def elect(): Unit = {
          activeControllerId = zkClient.getControllerId.getOrElse(-1)
          /*
           * We can get here during the initial startup and the handleDeleted ZK callback. Because of the potential race condition,
           * it's possible that the controller has already been elected when we get here. This check will prevent the following
           * createEphemeralPath method from getting into an infinite loop if this broker is already the controller.
           */
          if (activeControllerId != -1) {
            debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
            return
          } 

      The problem is:
      The current controller was shutting down because of "KafkaController.processExpire" event but a new one wasn't elected because ZK didn't clean /controller node.

      So in logs we see that Controller id=2 was resigned and then stopped election because think that it is still a live controller, but all listeners of ZK already shutted down.

      If after that I restart any non controller brokers (or they were restarted by any reason) then they don't get metadata because there is no the controller in cluster.
      And if we try to consume data from this brokers we get next errors:

      /opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning --bootstrap-server  $(hostname):9091 --consumer.config ~/connect.properties 
      
      WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
      WARN [Consumer clientId=consumer-console-consumer-57293-1, groupId=console-consumer-57293] Error while fetching metadata with correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

      UPD: Workaround for this bug:

      1. Setup proper SASL connection for ZK
      2. Disable using SASL connection for ZK via property "-Dzookeeper.sasl.client=false"

       

      Attachments

        Activity

          People

            joecqupt joecqupt
            andrei-vlg Andrei Lakhmanets
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: