Kafka
  1. Kafka
  2. KAFKA-698

broker may expose uncommitted data to a consumer

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:
      None

      Description

      We saw the following error in the log during testing. The problem seems to be that when the high watermark was at offset 39021, the broker incorrectly exposed an uncommitted message (at offset 39022) to the client. This doesn't always happen, but can happen when certain conditions are met, which I should explain in the comments.

      2013/01/11 00:54:42.059 ERROR [KafkaApis] [kafka-request-handler-2] [kafka] [] [KafkaApi-277] error when processing request (service_metrics,2,39022,2000000)
      java.lang.IllegalArgumentException: Attempt to read with a maximum offset (39021) less than the start offset (39022).
      at kafka.log.LogSegment.read(LogSegment.scala:105)
      at kafka.log.Log.read(Log.scala:386)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:369)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.map(Map.scala:93)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
      at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
      at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
      at kafka.server.KafkaApis$$anonfun$handleProducerRequest$3.apply(KafkaApis.scala:186)
      at kafka.server.KafkaApis$$anonfun$handleProducerRequest$3.apply(KafkaApis.scala:185)
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
      at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
      at java.lang.Thread.run(Thread.java:619)

        Activity

        Hide
        Jun Rao added a comment -

        My analysis is the following. In LogSegment.read(), we have the following code that calculates the end position of the maxOffset. When maxOffset is the high watermark, we won't find its position in the file (since the offset is exclusive). Therefore, we read the size of the segment file as the end position. What can happen is that a new message gets appended btw we call translateOffset and messageSet.sizeInBytes(). That message (may be uncommitted) will then be incorrectly returned to the consumer.

        // calculate the length of the message set to read based on whether or not they gave us a maxOffset
        val length =
        maxOffset match {
        case None =>
        // no max offset, just use the max size they gave unmolested
        maxSize
        case Some(offset) =>

        { // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset) val endPosition = if(mapping == null) messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file else mapping.position min(endPosition - startPosition.position, maxSize) }

        }

        I think the actual problem seems to be that in log.append, we advance nextoffset before the data is actually appended to the log. This causes the problem in log.read since it may see an offset that doesn't exist in the segment file yet. To fix this, we will need to advance nextOffset after the data is appended to the log.

        Show
        Jun Rao added a comment - My analysis is the following. In LogSegment.read(), we have the following code that calculates the end position of the maxOffset. When maxOffset is the high watermark, we won't find its position in the file (since the offset is exclusive). Therefore, we read the size of the segment file as the end position. What can happen is that a new message gets appended btw we call translateOffset and messageSet.sizeInBytes(). That message (may be uncommitted) will then be incorrectly returned to the consumer. // calculate the length of the message set to read based on whether or not they gave us a maxOffset val length = maxOffset match { case None => // no max offset, just use the max size they gave unmolested maxSize case Some(offset) => { // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset) val endPosition = if(mapping == null) messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file else mapping.position min(endPosition - startPosition.position, maxSize) } } I think the actual problem seems to be that in log.append, we advance nextoffset before the data is actually appended to the log. This causes the problem in log.read since it may see an offset that doesn't exist in the segment file yet. To fix this, we will need to advance nextOffset after the data is appended to the log.
        Hide
        Jay Kreps added a comment -

        Attached is a hacky but small patch that should fix this.

        The proper fix is to refactor ByteBufferMessageSet to no longer take an AtomicLong directly, but this is not very straight-forward and will be very conflict prone with trunk. So let's hack it for now and I will file a follow up ticket to refactor this.

        Show
        Jay Kreps added a comment - Attached is a hacky but small patch that should fix this. The proper fix is to refactor ByteBufferMessageSet to no longer take an AtomicLong directly, but this is not very straight-forward and will be very conflict prone with trunk. So let's hack it for now and I will file a follow up ticket to refactor this.
        Hide
        Jun Rao added a comment -

        +1 on the patch.

        Show
        Jun Rao added a comment - +1 on the patch.
        Hide
        Neha Narkhede added a comment -

        +1 on the patch

        Show
        Neha Narkhede added a comment - +1 on the patch

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development