Kafka
  1. Kafka
  2. KAFKA-698

broker may expose uncommitted data to a consumer

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: core
    • Labels:
      None

      Description

      We saw the following error in the log during testing. The problem seems to be that when the high watermark was at offset 39021, the broker incorrectly exposed an uncommitted message (at offset 39022) to the client. This doesn't always happen, but can happen when certain conditions are met, which I should explain in the comments.

      2013/01/11 00:54:42.059 ERROR [KafkaApis] [kafka-request-handler-2] [kafka] [] [KafkaApi-277] error when processing request (service_metrics,2,39022,2000000)
      java.lang.IllegalArgumentException: Attempt to read with a maximum offset (39021) less than the start offset (39022).
      at kafka.log.LogSegment.read(LogSegment.scala:105)
      at kafka.log.Log.read(Log.scala:386)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:369)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.map(Map.scala:93)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
      at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
      at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
      at kafka.server.KafkaApis$$anonfun$handleProducerRequest$3.apply(KafkaApis.scala:186)
      at kafka.server.KafkaApis$$anonfun$handleProducerRequest$3.apply(KafkaApis.scala:185)
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
      at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
      at java.lang.Thread.run(Thread.java:619)

        Activity

        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open Patch Available Patch Available
        2h 57m 1 Jun Rao 14/Jan/13 04:21
        Patch Available Patch Available Open Open
        1m 18s 1 Jun Rao 14/Jan/13 04:22
        Open Open Resolved Resolved
        2d 17m 1 Jay Kreps 16/Jan/13 04:40
        Resolved Resolved Closed Closed
        4d 14h 2m 1 Neha Narkhede 20/Jan/13 18:42
        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Jay Kreps made changes -
        Status Open [ 1 ] Resolved [ 5 ]
        Fix Version/s 0.8 [ 12317244 ]
        Resolution Fixed [ 1 ]
        Neha Narkhede made changes -
        Assignee Jay Kreps [ jkreps ]
        Hide
        Neha Narkhede added a comment -

        +1 on the patch

        Show
        Neha Narkhede added a comment - +1 on the patch
        Hide
        Jun Rao added a comment -

        +1 on the patch.

        Show
        Jun Rao added a comment - +1 on the patch.
        Jay Kreps made changes -
        Attachment KAFKA-698-v1.patch [ 12564738 ]
        Hide
        Jay Kreps added a comment -

        Attached is a hacky but small patch that should fix this.

        The proper fix is to refactor ByteBufferMessageSet to no longer take an AtomicLong directly, but this is not very straight-forward and will be very conflict prone with trunk. So let's hack it for now and I will file a follow up ticket to refactor this.

        Show
        Jay Kreps added a comment - Attached is a hacky but small patch that should fix this. The proper fix is to refactor ByteBufferMessageSet to no longer take an AtomicLong directly, but this is not very straight-forward and will be very conflict prone with trunk. So let's hack it for now and I will file a follow up ticket to refactor this.
        Jun Rao made changes -
        Status Patch Available [ 10002 ] Open [ 1 ]
        Jun Rao made changes -
        Comment [ Attach a patch. It removes replicaId from javaapi.FetchRequest and restricts the scope of the constructor in scala FetchRequest that sets replicaId. ]
        Jun Rao made changes -
        Attachment kafka-699.patch [ 12564657 ]
        Jun Rao made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Jun Rao made changes -
        Field Original Value New Value
        Attachment kafka-699.patch [ 12564657 ]
        Hide
        Jun Rao added a comment -

        My analysis is the following. In LogSegment.read(), we have the following code that calculates the end position of the maxOffset. When maxOffset is the high watermark, we won't find its position in the file (since the offset is exclusive). Therefore, we read the size of the segment file as the end position. What can happen is that a new message gets appended btw we call translateOffset and messageSet.sizeInBytes(). That message (may be uncommitted) will then be incorrectly returned to the consumer.

        // calculate the length of the message set to read based on whether or not they gave us a maxOffset
        val length =
        maxOffset match {
        case None =>
        // no max offset, just use the max size they gave unmolested
        maxSize
        case Some(offset) =>

        { // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset) val endPosition = if(mapping == null) messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file else mapping.position min(endPosition - startPosition.position, maxSize) }

        }

        I think the actual problem seems to be that in log.append, we advance nextoffset before the data is actually appended to the log. This causes the problem in log.read since it may see an offset that doesn't exist in the segment file yet. To fix this, we will need to advance nextOffset after the data is appended to the log.

        Show
        Jun Rao added a comment - My analysis is the following. In LogSegment.read(), we have the following code that calculates the end position of the maxOffset. When maxOffset is the high watermark, we won't find its position in the file (since the offset is exclusive). Therefore, we read the size of the segment file as the end position. What can happen is that a new message gets appended btw we call translateOffset and messageSet.sizeInBytes(). That message (may be uncommitted) will then be incorrectly returned to the consumer. // calculate the length of the message set to read based on whether or not they gave us a maxOffset val length = maxOffset match { case None => // no max offset, just use the max size they gave unmolested maxSize case Some(offset) => { // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset) val endPosition = if(mapping == null) messageSet.sizeInBytes() // the max offset is off the end of the log, use the end of the file else mapping.position min(endPosition - startPosition.position, maxSize) } } I think the actual problem seems to be that in log.append, we advance nextoffset before the data is actually appended to the log. This causes the problem in log.read since it may see an offset that doesn't exist in the segment file yet. To fix this, we will need to advance nextOffset after the data is appended to the log.
        Jun Rao created issue -

          People

          • Assignee:
            Jay Kreps
            Reporter:
            Jun Rao
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development