Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2235

LogCleaner offset map overflow

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.8.1, 0.8.2.0
    • Fix Version/s: 0.9.0.0
    • Component/s: core, log
    • Labels:
      None
    • Flags:
      Patch

      Description

      We've seen log cleaning generating an error for a topic with lots of small messages. It seems that cleanup map overflow is possible if a log segment contains more unique keys than empty slots in offsetMap. Check for baseOffset and map utilization before processing segment seems to be not enough because it doesn't take into account segment size (number of unique messages in the segment).

      I suggest to estimate upper bound of keys in a segment as a number of messages in the segment and compare it with the number of available slots in the map (keeping in mind desired load factor). It should work in cases where an empty map is capable to hold all the keys for a single segment. If even a single segment no able to fit into an empty map cleanup process will still fail. Probably there should be a limit on the log segment entries count?

      Here is the stack trace for this error:
      2015-05-19 16:52:48,758 ERROR [kafka-log-cleaner-thread-0] kafka.log.LogCleaner - [kafka-log-cleaner-thread-0], Error due to
      java.lang.IllegalArgumentException: requirement failed: Attempt to add a new entry to a full offset map.
      at scala.Predef$.require(Predef.scala:233)
      at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:79)
      at kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:543)
      at kafka.log.Cleaner$$anonfun$kafka$log$Cleaner$$buildOffsetMapForSegment$1.apply(LogCleaner.scala:538)
      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
      at kafka.message.MessageSet.foreach(MessageSet.scala:67)
      at kafka.log.Cleaner.kafka$log$Cleaner$$buildOffsetMapForSegment(LogCleaner.scala:538)
      at kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:515)
      at kafka.log.Cleaner$$anonfun$buildOffsetMap$3.apply(LogCleaner.scala:512)
      at scala.collection.immutable.Stream.foreach(Stream.scala:547)
      at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:512)
      at kafka.log.Cleaner.clean(LogCleaner.scala:307)
      at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:221)
      at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:199)
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

        Attachments

        1. KAFKA-2235_v2.patch
          2 kB
          Ivan Simoneko
        2. KAFKA-2235_v1.patch
          2 kB
          Ivan Simoneko

          Issue Links

            Activity

              People

              • Assignee:
                ivan.simonenko Ivan Simoneko
                Reporter:
                ivan.simonenko Ivan Simoneko
                Reviewer:
                Jun Rao
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: