Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-13468

Consumers may hang because IOException in Log#<init> does not trigger KafkaStorageException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 2.8.0
    • None
    • log
    • None

    Description

      When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is initialized, it may encounter an IO exception in the locally block, e.g., when the log directory cannot be created due to permission issue or IOException in  `initializeLeaderEpochCache`, `initializePartitionMetadata`, etc.

      class Log(...) {
        // ...
        locally {
          // create the log directory if it doesn't exist
          Files.createDirectories(dir.toPath)
      
          initializeLeaderEpochCache()
          initializePartitionMetadata()
      
          val nextOffset = loadSegments()
          // ...
        }
        // ...
      }

      We found that the broker encountering the IO exception prints an KafkaApi error log like the following and proceeds.

      [2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', topicId=573bAVHfRQeXApzAKevNIg, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', topicId=12dW2FxLTiyKmGi41HhdZQ, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', topicId=_yvmANyZSoK_PTV0e-nqCA, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) 

      But all the consumers that are consuming data from the affected topics (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers don’t have any error log related to this issue. They hang for more than 3 minutes.

      The IOException sometimes affects multiple offset topics:

      [2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', topicId=_MiMTCViS76osIyDdxekIg, partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), ...
      addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791)]) (kafka.server.RequestHandlerHelper) 

      Analysis

      The key stacktrace is as follows:

      "java.lang.Thread,run,748",
      "kafka.server.KafkaRequestHandler,run,74",
      "kafka.server.KafkaApis,handle,236",
      "kafka.server.KafkaApis,handleLeaderAndIsrRequest,258",
      "kafka.server.ReplicaManager,becomeLeaderOrFollower,1411",
      "kafka.server.ReplicaManager,makeLeaders,1566",
      "scala.collection.mutable.HashMap,foreachEntry,499",
      "scala.collection.mutable.HashMap$Node,foreachEntry,633",
      "kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62",
      "kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568",
      "kafka.cluster.Partition,makeLeader,548",
      "kafka.cluster.Partition,$anonfun$makeLeader$1,564",
      "kafka.cluster.Partition,createLogIfNotExists,324",
      "kafka.cluster.Partition,createLog,344",
      "kafka.log.LogManager,getOrCreateLog,783",
      "scala.Option,getOrElse,201",
      "kafka.log.LogManager,$anonfun$getOrCreateLog$1,830",
      "kafka.log.Log$,apply,2601",
      "kafka.log.Log,<init>,323" 

      Basically, the IOException is not be handled by Log but instead gets propagated all the way back to `core/src/main/scala/kafka/server/KafkaApis.scala`

        override def handle(request: RequestChannel.Request): Unit = {
          try {
            request.header.apiKey match {
              // ...
              case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
              // ...
            }
          } catch {
            case e: FatalExitError => throw e
            case e: Throwable => requestHelper.handleError(request, e)
          } finally {
            // ...
          }
        }
      

      I also notice the ReplicaManager in `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant comment about “unexpected error” with a TODO.

        /*
         * Make the current broker to become leader for a given set of partitions by:
         *
         * 1. Stop fetchers for these partitions
         * 2. Update the partition metadata in cache
         * 3. Add these partitions to the leader partitions set
         *
         * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
         * the error message will be set on each partition since we do not know which partition caused it. Otherwise,
         * return the set of partitions that are made leader due to this method
         *
         *  TODO: the above may need to be fixed later
         */
        private def makeLeaders(...): Set[Partition] = {
          // ...
          try {
            // ...
            partitionStates.forKeyValue { (partition, partitionState) =>
              try {
                if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) // line 1568
                  partitionsToMakeLeaders += partition
                else
                  stateChangeLogger.info(...)
              } catch {
                case e: KafkaStorageException =>
                  stateChangeLogger.error(...)
                  val dirOpt = getLogDir(partition.topicPartition)
                  error(...)
                  responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
              }
            }
          } catch {
            case e: Throwable =>
              partitionStates.keys.foreach { partition =>
                stateChangeLogger.error(...)
              }
              // Re-throw the exception for it to be caught in KafkaApis
              throw e
          }
          // ...
        } 

      Fix

      To fix this issue, I think we should catch the potential IOException when Log is initialized, and then throw a KafkaStorageException, just like many other IOException handlers in Kafka, e.g., https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120 

      After applying this fix, the aforementioned symptoms will disappear, i.e., the consumers will not hang and proceed to finish the remaining workload.

      One question is whether we should also use `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120 and https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139 . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node according to the protocol in https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277 and https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332

      P.S. This issue holds from Kafka version 2.8.0 to 3.0.0, but currently in the trunk branch, `core/src/main/scala/kafka/log/Log.scala` is renamed to `core/src/main/scala/kafka/log/UnifiedLog.scala` and there are some small code changes. However, the issue still holds, so we are still submitting the pull request for the fix for the trunk branch. And we also propose that the fix should be also applied to version 2.8.0 and 3.0.0, etc, with another pull request.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              functioner Haoze Wu
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: