Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-17727

Log dirs marked as offline incorrectly due to race conditions on segment delete

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Patch Available
    • Major
    • Resolution: Unresolved
    • 3.5.2
    • 4.0.0
    • core
    • None

    Description

      We are using a kafka cluster deployed on-premise. The brokers are JBOD with around 5/6 disks per broker. When running a intra broker rebalance (ie moving partitions between the log dirs) triggered by cruise-control, some nodes had a log dir marked as offline. When we looked closed the disk was normal and with a broker restart the log dir became online again.
      Investigating the issue, it's seems very similar to KAFKA-15391 and, specially, with KAFKA-15572. The main difference on the logs between the issue we encountered and the one described at KAFKA-15572 is that there the exception that marked the log dir as offline was a `java.nio.file.NoSuchFileException`. In our case, we had a `java.nio.channels.ClosedChannelException`:

      [2024-10-03 09:48:04,704] ERROR Error while flushing log for mytopic-20 in dir /data/0/kafka with offset 844857727 (exclusive) and recovery point 844857727 (org.apache.kafka.storage.internals.log.LogDirFailureChannel)
      java.nio.channels.ClosedChannelException
      at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
      at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
      at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:197)
      at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:471)
      at kafka.log.LogSegment.$anonfun$flush$1$adapted(LogSegment.scala:470)
      at com.yammer.metrics.core.Timer.time(Timer.java:91)
      at kafka.log.LogSegment.flush(LogSegment.scala:470)
      at kafka.log.LocalLog.$anonfun$flush$1(LocalLog.scala:174)
      at kafka.log.LocalLog.$anonfun$flush$1$adapted(LocalLog.scala:174)
      at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
      at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
      at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
      at kafka.log.LocalLog.flush(LocalLog.scala:174)
      at kafka.log.UnifiedLog.$anonfun$flush$2(UnifiedLog.scala:1537)
      at kafka.log.UnifiedLog.flush(UnifiedLog.scala:1724)
      at kafka.log.UnifiedLog.flushUptoOffsetExclusive(UnifiedLog.scala:1518)
      at kafka.log.UnifiedLog.$anonfun$roll$1(UnifiedLog.scala:1499)
      at org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)

      Investigating kafka code, I believe the root cause is very similar to the one KAFKA-15572(https://issues.apache.org/jira/browse/KAFKA-15572). In our case, we can see via logs that LogManager.scala was able to replace the old log with the new one: 

      [2024-10-03 09:47:06,349] INFO Attempting to replace current log Log(dir=/data/0/kafka/mytopic-20, topicId=jl0IzzqWSHedIunbTLziYg, topic=mytopic, partition=20, highWatermark=844861246, lastStableOffset=844861246, logStartOffset=842164116, logEndOffset=844861247) with Log(dir=/data/5/kafka/mytopic-20.9fa1dfec1c4a4045b8806565eced19bd-future, topicId=jl0IzzqWSHedIunbTLziYg, topic=mytopic, partition=20, highWatermark=844861246, lastStableOffset=844861246, logStartOffset=842164116, logEndOffset=844861247) for mytopic-20 (kafka.log.LogManager)
      INFO Cleaning for partition mytopic-20 is resumed (kafka.log.LogManager)
      [2024-10-03 09:47:06,364] INFO The current replica is successfully replaced with the future replica for mytopic-20 (kafka.log.LogManager)

      During this process, it closes the old log (LogManager.scala#L1125) and schedule if it to be deleted.  Something triggers UnifiedLog.roll and anything that triggers flush after that for this segment will eventually call LocalLog.flush which will try to close each segment to flush [LocalLog.scala#L174 Here, however, the segment is already closed and thus the exception. It's not clear to me what triggers the roll. In theory the old log was already replaced and the only thing remaining for this old segments it to be deleted. As this is a high volume topic, however, it's not surprising to have segments rolling frequently. 

      Attachments

        Issue Links

          Activity

            People

              gnarula Gaurav Narula
              lucianosabenca Luciano Sabença
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: