Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-5316

Log cleaning can increase message size and cause cleaner to crash with buffer overflow

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.10.2.2, 0.11.0.0
    • Component/s: None
    • Labels:
      None

      Description

      We have observed in practice that it is possible for a compressed record set to grow after cleaning. Since the size of the cleaner's input and output buffers are identical, this can lead to overflow of the output buffer:

      [2017-05-23 15:05:15,480] ERROR [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
      java.nio.BufferOverflowException
              at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
              at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.java:104)
              at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:163)
              at org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:114)
              at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
              at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
              at kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
              at scala.collection.immutable.List.foreach(List.scala:318)
              at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
              at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
              at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
              at scala.collection.immutable.List.foreach(List.scala:318)
              at kafka.log.Cleaner.clean(LogCleaner.scala:362)
              at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
              at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
              at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
      [2017-05-23 15:05:15,481] INFO [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
      

      It is also then possible for a compressed message set to grow beyond the max message size. Due to the changes in KIP-74 to alter fetch semantics, the suggestion for this case is to allow the recompressed message set to exceed the max message size. This should be rare in practice and won't prevent consumers from making progress.

      To handle the overflow issue, one option is to allocate a temporary buffer when filtering in MemoryRecords.filterTo and return it in the result. As an optimization, we can resort to this only when there is a single recompressed message set which is larger than the entire write buffer.

        Attachments

          Activity

            People

            • Assignee:
              hachikuji Jason Gustafson
              Reporter:
              hachikuji Jason Gustafson
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: