Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3047

Explicit offset assignment in Log.append can corrupt the log

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.9.0.0
    • Fix Version/s: 0.10.0.0
    • Component/s: log
    • Labels:
      None

      Description

      Log.append() has assignOffsets parameter, which, when set to false, should cause Kafka to use the offsets specified in the ByteBufferMessageSet and not recalculate them based on nextOffsetMetadata. However, in that function, appendInfo.firstOffset is unconditionally set to nextOffsetMetadata.messageOffset. This can cause corruption of the log in the following scenario:

      • nextOffsetMetadata.messageOffset is 2001
      • append(messageSet, assignOffsets = false) is called, where messageSet contains offsets 1001...1500
      • after val appendInfo = analyzeAndValidateMessageSet(messages) call, appendInfo.fistOffset is 1001 and appendInfo.lastOffset is 1500
      • after appendInfo.firstOffset = nextOffsetMetadata.messageOffset call, appendInfo.fistOffset is 2001 and appendInfo.lastOffset is 1500
      • consistency check if(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset) succeeds (the second condition can never fail due to unconditional assignment) and writing proceeds
      • the message set is appended to current log segment starting at offset 2001, but the offsets in the set are 1001...1500
      • the system shuts down abruptly
      • on restart, the following unrecoverable error is reported:
      Exception in thread "main" kafka.common.InvalidOffsetException: Attempt to append an offset (1001) to position 12345 no larger than the last offset appended (1950) to xyz/00000000000000000000.index.
        at kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
        at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
        at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
        at kafka.log.LogSegment.recover(LogSegment.scala:188)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
        at kafka.log.Log.loadSegments(Log.scala:160)
        at kafka.log.Log.<init>(Log.scala:90)
        at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
        at java.util.concurrent.FutureTask.run(FutureTask.java:166)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
      

      Proposed fix: the assignment appendInfo.firstOffset = nextOffsetMetadata.messageOffset should only happen in if (assignOffsets) branch of code.

        Attachments

          Activity

            People

            • Assignee:
              ijuma Ismael Juma
              Reporter:
              mmakowski Maciek Makowski
              Reviewer:
              Guozhang Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: