Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3323

Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 0.8.1.1, 0.8.2.1
    • None
    • log
    • None

    Description

      Once the Offset Index has negative offset values, the binary search for position lookup is broken. This causes consumers of compact topics to skip large offset intervals when bootstrapping. This has serious implications for consumers of compact topics.

       /**
         * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.
         */
        def append(offset: Long, position: Int) {
          inLock(lock) {
            require(!isFull, "Attempt to append to a full index (size = " + size + ").")
            if (size.get == 0 || offset > lastOffset) {
              debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
              this.mmap.putInt((offset - baseOffset).toInt)
              this.mmap.putInt(position)
              this.size.incrementAndGet()
              this.lastOffset = offset
              require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
            } else {
              throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
                .format(offset, entries, lastOffset, file.getAbsolutePath))
            }
          }
        }
      

      OffsetIndex.append assumes that (offset - baseOffset) can be represented as an integer without overflow. If the LogSegment is from a compacted topic, this assumption may not be valid. The result is a quiet integer overflow, which stores a negative value into the index.

      I believe that the issue is caused by the LogCleaner. Specifically, by the groupings produced by

      /**
         * Group the segments in a log into groups totaling less than a given size. the size is enforced separately for the log data and the index data.
         * We collect a group of such segments together into a single
         * destination segment. This prevents segment sizes from shrinking too much.
         *
         * @param segments The log segments to group
         * @param maxSize the maximum size in bytes for the total of all log data in a group
         * @param maxIndexSize the maximum size in bytes for the total of all index data in a group
         *
         * @return A list of grouped segments
         */
        private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
      

      Since this method is only concerned with grouping by size, without taking baseOffset and groupMaxOffset into account, it will produce groups that when cleaned into a single segment, have offsets that overflow. This is more likely for topics with low key cardinality, but high update volume, as you could wind up with very few cleaned records, but with very high offsets.

      Attachments

        1. log_dump.txt
          812 kB
          Michael Schiff
        2. index_dump.txt
          0.4 kB
          Michael Schiff

        Issue Links

          Activity

            People

              jkreps Jay Kreps
              michael.schiff Michael Schiff
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: