Kafka
  1. Kafka
  2. KAFKA-725

Broker Exception: Attempt to read with a maximum offset less than start offset

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.10.0.0
    • Component/s: log
    • Labels:
      None

      Description

      I have a simple consumer that's reading from a single topic/partition pair. Running it seems to trigger these messages on the broker periodically:

      2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] [] [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152)
      java.lang.IllegalArgumentException: Attempt to read with a maximum offset (7951715) less than the start offset (7951732).
      at kafka.log.LogSegment.read(LogSegment.scala:105)
      at kafka.log.Log.read(Log.scala:390)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.map(Map.scala:93)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
      at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
      at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
      at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186)
      at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185)
      at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
      at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
      at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
      at java.lang.Thread.run(Thread.java:619)

      When I shut the consumer down, I don't see the exceptions anymore.

      This is the code that my consumer is running:
      while(true) {
      // we believe the consumer to be connected, so try and use it for a fetch request
      val request = new FetchRequestBuilder()
      .addFetch(topic, partition, nextOffset, fetchSize)
      .maxWait(Int.MaxValue)
      // TODO for super high-throughput, might be worth waiting for more bytes
      .minBytes(1)
      .build

      debug("Fetching messages for stream %s and offset %s." format (streamPartition, nextOffset))
      val messages = connectedConsumer.fetch(request)
      debug("Fetch complete for stream %s and offset %s. Got messages: %s" format (streamPartition, nextOffset, messages))
      if (messages.hasError)

      { warn("Got error code from broker for %s: %s. Shutting down consumer to trigger a reconnect." format (streamPartition, messages.errorCode(topic, partition))) ErrorMapping.maybeThrowException(messages.errorCode(topic, partition)) }

      messages.messageSet(topic, partition).foreach(msg =>

      { watchers.foreach(_.onMessagesReady(msg.offset.toString, msg.message.payload)) nextOffset = msg.nextOffset }

      )
      }

      Any idea what might be causing this error?

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          This looks exactly like KAFKA-698. Chris, did you try with a Kafka cluster that includes a fix for KAFKA-698 ?

          Show
          Neha Narkhede added a comment - This looks exactly like KAFKA-698 . Chris, did you try with a Kafka cluster that includes a fix for KAFKA-698 ?
          Hide
          lokesh Birla added a comment -

          Neha,

          I still see this issue in 0.8.1.1.

          https://issues.apache.org/jira/browse/KAFKA-1806

          Show
          lokesh Birla added a comment - Neha, I still see this issue in 0.8.1.1. https://issues.apache.org/jira/browse/KAFKA-1806
          Hide
          Diwakar added a comment -

          Neha,

          we have 6 brokers and 131 partitions per topic(replication factor : 3 ) and recently updated to kafka_2.10-0.8.2.0 and facing similar issue causing lot of below errors.Due to this it seems like producers are unable to produce to kafka successfully.

          [2015-01-11 05:21:56.604-0700] ERROR [Replica Manager on Broker 2]: Error when processing fetch request for partition [application-access,13] offset 42748276 from consumer with correlation id 4974. Possible cause: Attempt to read with a maximum offset (42748275) less than the start offset (42748276).

          Any solution available to fix this.

          Thanks
          Diwakar

          Show
          Diwakar added a comment - Neha, we have 6 brokers and 131 partitions per topic(replication factor : 3 ) and recently updated to kafka_2.10-0.8.2.0 and facing similar issue causing lot of below errors.Due to this it seems like producers are unable to produce to kafka successfully. [2015-01-11 05:21:56.604-0700] ERROR [Replica Manager on Broker 2] : Error when processing fetch request for partition [application-access,13] offset 42748276 from consumer with correlation id 4974. Possible cause: Attempt to read with a maximum offset (42748275) less than the start offset (42748276). Any solution available to fix this. Thanks Diwakar
          Hide
          Manu Zhang added a comment -

          Is anyone still looking at this issue ? We have run into this exception on a 4-node kafka_2.10-0.8.2.1 cluster where 4 producers produce data with throughput of 17k messages/s on each node.

          Show
          Manu Zhang added a comment - Is anyone still looking at this issue ? We have run into this exception on a 4-node kafka_2.10-0.8.2.1 cluster where 4 producers produce data with throughput of 17k messages/s on each node.
          Hide
          Gwen Shapira added a comment -

          KAFKA-2477 has somewhat similar symptoms. Perhaps you are running into that? You can try applying the patch and checking if it fixes your issue.

          Show
          Gwen Shapira added a comment - KAFKA-2477 has somewhat similar symptoms. Perhaps you are running into that? You can try applying the patch and checking if it fixes your issue.
          Hide
          Jiangjie Qin added a comment -

          Gwen Shapira Manu Zhang Not sure if it is related to KAFKA-2477. KAFKA-2477 should only affect replica fetchers, and we never set maxOffset for replica fetchers. The error log here seems caused by a regular consumers trying to fetch beyond high watermark. But this should not affect producing.

          Show
          Jiangjie Qin added a comment - Gwen Shapira Manu Zhang Not sure if it is related to KAFKA-2477 . KAFKA-2477 should only affect replica fetchers, and we never set maxOffset for replica fetchers. The error log here seems caused by a regular consumers trying to fetch beyond high watermark. But this should not affect producing.
          Hide
          Manu Zhang added a comment -

          I can reproduce this on 0.9.0.0. The error log is

          [2016-01-28 16:12:32,840] ERROR [Replica Manager on Broker 1]: Error processing fetch operation on partition [ad-events,1] offset 75510318 (kafka.server.ReplicaManager)

          I also print the sent offset from producer

          time partition offset
          16:12:32.840 1 75510318

          It seems the offset is produced and consumed at the same time.

          Show
          Manu Zhang added a comment - I can reproduce this on 0.9.0.0. The error log is [2016-01-28 16:12:32,840] ERROR [Replica Manager on Broker 1] : Error processing fetch operation on partition [ad-events,1] offset 75510318 (kafka.server.ReplicaManager) I also print the sent offset from producer time partition offset 16:12:32.840 1 75510318 It seems the offset is produced and consumed at the same time.
          Hide
          Manu Zhang added a comment -

          I can reproduce this on 0.9.0.0. The error log is

          [2016-01-28 16:12:32,840] ERROR [Replica Manager on Broker 1]: Error processing fetch operation on partition [ad-events,1] offset 75510318 (kafka.server.ReplicaManager)

          I also print the sent offset from producer

          time partition offset
          16:12:32.840 1 75510318

          It seems the offset is produced and consumed at the same time.

          Show
          Manu Zhang added a comment - I can reproduce this on 0.9.0.0. The error log is [2016-01-28 16:12:32,840] ERROR [Replica Manager on Broker 1] : Error processing fetch operation on partition [ad-events,1] offset 75510318 (kafka.server.ReplicaManager) I also print the sent offset from producer time partition offset 16:12:32.840 1 75510318 It seems the offset is produced and consumed at the same time.
          Hide
          Stig Rohde Døssing added a comment - - edited

          We're seeing this on 8.2.2. I'm not sure Log/LogSegment handles the high watermark as gracefully as they maybe could.

          My guess at how it's happening:
          Assume a replica set of at least 2.
          A consumer (in our case the Storm KafkaSpout) reads up to the end of the committed log, say up to message 5.
          The leader for the relevant partition then receives one or more messages (6).
          Before the new message(s) are replicated, the consumer increments its offset and fetches (from 6).
          The leader receives the fetch, sets the maxOffset for read to the high watermark (5), and compares the end of the log to the requested offset (see https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/log/Log.scala#L482). This check passes because the end of the log is at 6.
          When the read on LogSegment is reached, it will error out when the maxOffset is smaller than the start offset, which causes this error log. https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/log/LogSegment.scala#L146

          Maybe the check in Log should include whether the maxOffset is larger than offset as well?

          Edit: Or maybe Kafka should allow the fetch to wait until the requested offset is available, similar to how minBytes can be waited for?

          Show
          Stig Rohde Døssing added a comment - - edited We're seeing this on 8.2.2. I'm not sure Log/LogSegment handles the high watermark as gracefully as they maybe could. My guess at how it's happening: Assume a replica set of at least 2. A consumer (in our case the Storm KafkaSpout) reads up to the end of the committed log, say up to message 5. The leader for the relevant partition then receives one or more messages (6). Before the new message(s) are replicated, the consumer increments its offset and fetches (from 6). The leader receives the fetch, sets the maxOffset for read to the high watermark (5), and compares the end of the log to the requested offset (see https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/log/Log.scala#L482 ). This check passes because the end of the log is at 6. When the read on LogSegment is reached, it will error out when the maxOffset is smaller than the start offset, which causes this error log. https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/log/LogSegment.scala#L146 Maybe the check in Log should include whether the maxOffset is larger than offset as well? Edit: Or maybe Kafka should allow the fetch to wait until the requested offset is available, similar to how minBytes can be waited for?
          Hide
          ASF GitHub Bot added a comment -

          GitHub user srdo opened a pull request:

          https://github.com/apache/kafka/pull/1178

          KAFKA-725: Change behavior of Log/LogSegment when attempting read on an offset that's above high watermark.

          This should make Log.read act the same when startOffset is larger than maxOffset as it would if startOffset was larger than logEndOffset. The current behavior can result in an IllegalArgumentException from LogSegment if a consumer attempts to fetch an offset above the high watermark which is present in the leader's log. It seems more correct if Log.read presents the view of the log to consumers as if it simply ended at maxOffset (high watermark).

          I've tried to describe an example scenario of this happening here https://issues.apache.org/jira/browse/KAFKA-725?focusedCommentId=15221673&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15221673

          I'm not sure I understand why ReplicaManager sets maxOffset to the high watermark, and not high watermark + 1. Isn't the high watermark the last committed message, and readable by consumers?

          Tests passed for me locally on second try, seems like it just hit a flaky test.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/srdo/kafka KAFKA-725

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/1178.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1178


          commit 5c7d583ec1af0892e9fadc4bbdcbeaa94390524e
          Author: Stig Rohde Døssing <sdo@it-minds.dk>
          Date: 2016-04-02T12:20:50Z

          KAFKA-725: Throw OffsetOutOfRangeException when reading from Log with maxOffset > startOffset

          commit 5546433916d49b30b0869964a779e1af189be0ce
          Author: Stig Rohde Døssing <sdo@it-minds.dk>
          Date: 2016-04-02T13:37:22Z

          KAFKA-725: Return empty message set if reading from Log with maxOffset+1 == startOffset

          commit 5808b31828d3703729569476217880971bf279af
          Author: Stig Rohde Døssing <sdo@it-minds.dk>
          Date: 2016-04-02T14:09:40Z

          KAFKA-725: Return only message offset when reading one beyond maxOffset


          Show
          ASF GitHub Bot added a comment - GitHub user srdo opened a pull request: https://github.com/apache/kafka/pull/1178 KAFKA-725 : Change behavior of Log/LogSegment when attempting read on an offset that's above high watermark. This should make Log.read act the same when startOffset is larger than maxOffset as it would if startOffset was larger than logEndOffset. The current behavior can result in an IllegalArgumentException from LogSegment if a consumer attempts to fetch an offset above the high watermark which is present in the leader's log. It seems more correct if Log.read presents the view of the log to consumers as if it simply ended at maxOffset (high watermark). I've tried to describe an example scenario of this happening here https://issues.apache.org/jira/browse/KAFKA-725?focusedCommentId=15221673&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15221673 I'm not sure I understand why ReplicaManager sets maxOffset to the high watermark, and not high watermark + 1. Isn't the high watermark the last committed message, and readable by consumers? Tests passed for me locally on second try, seems like it just hit a flaky test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/kafka KAFKA-725 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1178 commit 5c7d583ec1af0892e9fadc4bbdcbeaa94390524e Author: Stig Rohde Døssing <sdo@it-minds.dk> Date: 2016-04-02T12:20:50Z KAFKA-725 : Throw OffsetOutOfRangeException when reading from Log with maxOffset > startOffset commit 5546433916d49b30b0869964a779e1af189be0ce Author: Stig Rohde Døssing <sdo@it-minds.dk> Date: 2016-04-02T13:37:22Z KAFKA-725 : Return empty message set if reading from Log with maxOffset+1 == startOffset commit 5808b31828d3703729569476217880971bf279af Author: Stig Rohde Døssing <sdo@it-minds.dk> Date: 2016-04-02T14:09:40Z KAFKA-725 : Return only message offset when reading one beyond maxOffset
          Hide
          Stig Rohde Døssing added a comment -

          Nevermind this, I misunderstood the high watermark to be the last committed offset. It seems to be the last committed offset + 1. There's still a minor issue if a consumer requests an offset that's in the log but above the high watermark, which the PR should fix.

          Show
          Stig Rohde Døssing added a comment - Nevermind this, I misunderstood the high watermark to be the last committed offset. It seems to be the last committed offset + 1. There's still a minor issue if a consumer requests an offset that's in the log but above the high watermark, which the PR should fix.
          Hide
          ASF GitHub Bot added a comment -

          Github user srdo closed the pull request at:

          https://github.com/apache/kafka/pull/1178

          Show
          ASF GitHub Bot added a comment - Github user srdo closed the pull request at: https://github.com/apache/kafka/pull/1178
          Hide
          ASF GitHub Bot added a comment -

          GitHub user srdo reopened a pull request:

          https://github.com/apache/kafka/pull/1178

          KAFKA-725: Change behavior of Log/LogSegment when attempting read on an offset that's above high watermark.

          This should make Log.read act the same when startOffset is larger than maxOffset as it would if startOffset was larger than logEndOffset. The current behavior can result in an IllegalArgumentException from LogSegment if a consumer attempts to fetch an offset above the high watermark which is present in the leader's log. It seems more correct if Log.read presents the view of the log to consumers as if it simply ended at maxOffset (high watermark).

          I've tried to describe an example scenario of this happening here https://issues.apache.org/jira/browse/KAFKA-725?focusedCommentId=15221673&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15221673

          I'm not sure I understand why ReplicaManager sets maxOffset to the high watermark, and not high watermark + 1. Isn't the high watermark the last committed message, and readable by consumers?

          Tests passed for me locally on second try, seems like it just hit a flaky test.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/srdo/kafka KAFKA-725

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/1178.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1178


          commit c7bab99b77b71c73380d473facda1138799e42a6
          Author: Stig Rohde Døssing <sdo@it-minds.dk>
          Date: 2016-04-02T12:20:50Z

          KAFKA-725: Throw OffsetOutOfRangeException when reading from Log with maxOffset > startOffset

          commit 4f5b415651ec45d3040c22393d24293de4f2cfd0
          Author: Stig Rohde Døssing <sdo@it-minds.dk>
          Date: 2016-04-02T23:29:02Z

          KAFKA-725: Put check for HW from consumer in ReplicaManager.readFromLocalLog instead of Log.read


          Show
          ASF GitHub Bot added a comment - GitHub user srdo reopened a pull request: https://github.com/apache/kafka/pull/1178 KAFKA-725 : Change behavior of Log/LogSegment when attempting read on an offset that's above high watermark. This should make Log.read act the same when startOffset is larger than maxOffset as it would if startOffset was larger than logEndOffset. The current behavior can result in an IllegalArgumentException from LogSegment if a consumer attempts to fetch an offset above the high watermark which is present in the leader's log. It seems more correct if Log.read presents the view of the log to consumers as if it simply ended at maxOffset (high watermark). I've tried to describe an example scenario of this happening here https://issues.apache.org/jira/browse/KAFKA-725?focusedCommentId=15221673&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15221673 I'm not sure I understand why ReplicaManager sets maxOffset to the high watermark, and not high watermark + 1. Isn't the high watermark the last committed message, and readable by consumers? Tests passed for me locally on second try, seems like it just hit a flaky test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/kafka KAFKA-725 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1178 commit c7bab99b77b71c73380d473facda1138799e42a6 Author: Stig Rohde Døssing <sdo@it-minds.dk> Date: 2016-04-02T12:20:50Z KAFKA-725 : Throw OffsetOutOfRangeException when reading from Log with maxOffset > startOffset commit 4f5b415651ec45d3040c22393d24293de4f2cfd0 Author: Stig Rohde Døssing <sdo@it-minds.dk> Date: 2016-04-02T23:29:02Z KAFKA-725 : Put check for HW from consumer in ReplicaManager.readFromLocalLog instead of Log.read
          Hide
          Guozhang Wang added a comment -

          Stig Rohde Døssing I think your reasoning still makes sense, because a producer usually produces in batches. So following your case, after receiving a batch of 5 messages, the log end offset could then be 5 + 5 + 1 = 11, but before it was replicated to follower the high watermark is still 5 + 1 = 6. Hence this check will fail.

          Show
          Guozhang Wang added a comment - Stig Rohde Døssing I think your reasoning still makes sense, because a producer usually produces in batches. So following your case, after receiving a batch of 5 messages, the log end offset could then be 5 + 5 + 1 = 11, but before it was replicated to follower the high watermark is still 5 + 1 = 6. Hence this check will fail.
          Hide
          Mudit Kumar added a comment -

          Hi,

          We are seeing below exception in our kafka logs on one of the broker id.

          [2016-04-08 07:59:58,486] ERROR [Replica Manager on Broker 3]: Error processing fetch operation on partition [subscribed_product_logs,17] offset 483780 (kafka.server.ReplicaManager)
          java.lang.IllegalStateException: Failed to read complete buffer for targetOffset 483780 startPosition 958861516 in /kafka/kafka-logs/subscribed_product_logs-17/00000000000000378389.log
          at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:133)
          at kafka.log.LogSegment.translateOffset(LogSegment.scala:105)
          at kafka.log.LogSegment.read(LogSegment.scala:126)
          at kafka.log.Log.read(Log.scala:506)
          at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536)
          at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
          at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
          at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
          at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
          at scala.collection.AbstractTraversable.map(Traversable.scala:104)
          at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:507)
          at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:462)
          at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:431)
          at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
          at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
          at java.lang.Thread.run(Thread.java:745)
          [2016-04-08 07:59:58,486] ERROR [Replica Manager on Broker 3]: Error processing fetch operation on partition [subscribed_product_logs,8] offset 592637 (kafka.server.ReplicaManager)
          java.lang.IllegalStateException: Failed to read complete buffer for targetOffset 592637 startPosition 780606424 in /kafka/kafka-logs/subscribed_product_logs-8/00000000000000505731.log
          at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:133)
          at kafka.log.LogSegment.translateOffset(LogSegment.scala:105)
          at kafka.log.LogSegment.read(LogSegment.scala:126)
          at kafka.log.Log.read(Log.scala:506)
          at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536)
          at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
          at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
          at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
          at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
          at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
          at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
          at scala.collection.AbstractTraversable.map(Traversable.scala:104)
          at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:507)
          at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:462)
          at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:431)
          at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
          at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
          at java.lang.Thread.run(Thread.java:745)

          seems this is related to same bug.Any update on this?

          Show
          Mudit Kumar added a comment - Hi, We are seeing below exception in our kafka logs on one of the broker id. [2016-04-08 07:59:58,486] ERROR [Replica Manager on Broker 3] : Error processing fetch operation on partition [subscribed_product_logs,17] offset 483780 (kafka.server.ReplicaManager) java.lang.IllegalStateException: Failed to read complete buffer for targetOffset 483780 startPosition 958861516 in /kafka/kafka-logs/subscribed_product_logs-17/00000000000000378389.log at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:133) at kafka.log.LogSegment.translateOffset(LogSegment.scala:105) at kafka.log.LogSegment.read(LogSegment.scala:126) at kafka.log.Log.read(Log.scala:506) at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536) at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:507) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:462) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:431) at kafka.server.KafkaApis.handle(KafkaApis.scala:69) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) [2016-04-08 07:59:58,486] ERROR [Replica Manager on Broker 3] : Error processing fetch operation on partition [subscribed_product_logs,8] offset 592637 (kafka.server.ReplicaManager) java.lang.IllegalStateException: Failed to read complete buffer for targetOffset 592637 startPosition 780606424 in /kafka/kafka-logs/subscribed_product_logs-8/00000000000000505731.log at kafka.log.FileMessageSet.searchFor(FileMessageSet.scala:133) at kafka.log.LogSegment.translateOffset(LogSegment.scala:105) at kafka.log.LogSegment.read(LogSegment.scala:126) at kafka.log.Log.read(Log.scala:506) at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:536) at kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:507) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:507) at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:462) at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:431) at kafka.server.KafkaApis.handle(KafkaApis.scala:69) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:745) seems this is related to same bug.Any update on this?
          Hide
          ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/kafka/pull/1178

          Show
          ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/1178
          Hide
          Guozhang Wang added a comment -

          Issue resolved by pull request 1178
          https://github.com/apache/kafka/pull/1178

          Show
          Guozhang Wang added a comment - Issue resolved by pull request 1178 https://github.com/apache/kafka/pull/1178
          Hide
          Stig Rohde Døssing added a comment -

          Guozhang Wang Makes sense. I'm wondering if it would be better for the request to be put into purgatory then? If the request hits inbetween the high watermark and the end of the log, we can reasonably expect that offset to be readable shortly, while if the client gets a general OffsetOutOfRangeException, it might make more sense for the client to restart at either end of the log.

          What I mean is basically, what does proper handling of this exception look like on the client-side now?

          Show
          Stig Rohde Døssing added a comment - Guozhang Wang Makes sense. I'm wondering if it would be better for the request to be put into purgatory then? If the request hits inbetween the high watermark and the end of the log, we can reasonably expect that offset to be readable shortly, while if the client gets a general OffsetOutOfRangeException, it might make more sense for the client to restart at either end of the log. What I mean is basically, what does proper handling of this exception look like on the client-side now?
          Hide
          Guozhang Wang added a comment -

          Usually client code should gracefully handle OffsetOutOfRangeException by requesting the current log end offset and retry fetching. Note that this handling would just make this exception potentially being triggered multiple times until the data is replicated complete and HW advanced; in most cases this is fine as it is only transient. But I agree that a more ideal solution is to park the request in purgatory so that we can reduce round trips retrying in this case as well.

          Show
          Guozhang Wang added a comment - Usually client code should gracefully handle OffsetOutOfRangeException by requesting the current log end offset and retry fetching. Note that this handling would just make this exception potentially being triggered multiple times until the data is replicated complete and HW advanced; in most cases this is fine as it is only transient. But I agree that a more ideal solution is to park the request in purgatory so that we can reduce round trips retrying in this case as well.
          Hide
          Stig Rohde Døssing added a comment -

          Guozhang Wang Okay, returning the error should be fine then. I can't really think of a case where this error can happen if the client is well behaved and unclean leader election is turned off. If the client never increments its offset by more than 1 past the most recently consumed message, it shouldn't be possible for it to request an offset higher than the high watermark, since the most recent offset it can have consumed is HW - 1.

          Show
          Stig Rohde Døssing added a comment - Guozhang Wang Okay, returning the error should be fine then. I can't really think of a case where this error can happen if the client is well behaved and unclean leader election is turned off. If the client never increments its offset by more than 1 past the most recently consumed message, it shouldn't be possible for it to request an offset higher than the high watermark, since the most recent offset it can have consumed is HW - 1.
          Hide
          Jun Rao added a comment -

          Stig Rohde Døssing, thanks for the patch. It's still not very clear to me how a consumer can trigger the IllegalArgumentException even without the patch. The broker only returns messages up to the high watermark (HW) to the consumer. So, the offset from a consumer should always be <= HW. The problem can only occur if a consumer uses an offset > HW, but <= the log end offset, which should never happen in a normal consumer.

          Show
          Jun Rao added a comment - Stig Rohde Døssing , thanks for the patch. It's still not very clear to me how a consumer can trigger the IllegalArgumentException even without the patch. The broker only returns messages up to the high watermark (HW) to the consumer. So, the offset from a consumer should always be <= HW. The problem can only occur if a consumer uses an offset > HW, but <= the log end offset, which should never happen in a normal consumer.
          Hide
          Jiangjie Qin added a comment -

          Jun Rao I asked this question in the RB. It seems the consumer in this case is not the Java consumer. Theoretically a java consumer can only fetch beyond HW when unclean leader election occurs.

          Show
          Jiangjie Qin added a comment - Jun Rao I asked this question in the RB. It seems the consumer in this case is not the Java consumer. Theoretically a java consumer can only fetch beyond HW when unclean leader election occurs.
          Hide
          Jun Rao added a comment -

          Reopen this jira since the fix exposes a new issue. When the leader switches (say due to leader balancing), the new leader's HW can actually be smaller than the previous leader's HW since HW is propagated asynchronously. The new leader's log end offset is >= than the previous leader's HW and eventually its HW will move to its log end offset. Before that happens, if a consumer fetches data using previous leader's HW, with the patch, the consumer will get OffsetOutOfRangeException and thus has to reset the offset, which is bad. Without the patch, the consumer will get an empty response instead.

          So, it seems that we should revert the changes in this patch.

          Show
          Jun Rao added a comment - Reopen this jira since the fix exposes a new issue. When the leader switches (say due to leader balancing), the new leader's HW can actually be smaller than the previous leader's HW since HW is propagated asynchronously. The new leader's log end offset is >= than the previous leader's HW and eventually its HW will move to its log end offset. Before that happens, if a consumer fetches data using previous leader's HW, with the patch, the consumer will get OffsetOutOfRangeException and thus has to reset the offset, which is bad. Without the patch, the consumer will get an empty response instead. So, it seems that we should revert the changes in this patch.
          Hide
          Guozhang Wang added a comment -

          Jun, thanks for pointing it out. While reverting this change, I'm thinking we should change https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L147 to return empty response instead of throw exceptions as well. What do you think?

          Show
          Guozhang Wang added a comment - Jun, thanks for pointing it out. While reverting this change, I'm thinking we should change https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L147 to return empty response instead of throw exceptions as well. What do you think?
          Hide
          Jun Rao added a comment -

          Yes, I agree. If the requested offset is > MaxOffset, it's better to just return an empty response instead of throwing an IllegalStateException. We can add a comment on why we want to do that. Also, while you are there, could you fix the following comment above LogSegment.read()? maxPosition is not optional.

          • @param maxPosition An optional maximum position in the log segment that should be exposed for read.
          Show
          Jun Rao added a comment - Yes, I agree. If the requested offset is > MaxOffset, it's better to just return an empty response instead of throwing an IllegalStateException. We can add a comment on why we want to do that. Also, while you are there, could you fix the following comment above LogSegment.read()? maxPosition is not optional. @param maxPosition An optional maximum position in the log segment that should be exposed for read.
          Hide
          ASF GitHub Bot added a comment -

          GitHub user guozhangwang opened a pull request:

          https://github.com/apache/kafka/pull/1327

          HOTFIX: follow-up on KAFKA-725 to remove the check and return empty response instead of throw exceptions

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/guozhangwang/kafka K725r

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/kafka/pull/1327.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #1327


          commit 9fdfe9ce1a0242f78775cbc5e24fc4a059a07296
          Author: Guozhang Wang <wangguoz@gmail.com>
          Date: 2016-05-05T19:03:30Z

          follow-up on KAFKA-725


          Show
          ASF GitHub Bot added a comment - GitHub user guozhangwang opened a pull request: https://github.com/apache/kafka/pull/1327 HOTFIX: follow-up on KAFKA-725 to remove the check and return empty response instead of throw exceptions You can merge this pull request into a Git repository by running: $ git pull https://github.com/guozhangwang/kafka K725r Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1327.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1327 commit 9fdfe9ce1a0242f78775cbc5e24fc4a059a07296 Author: Guozhang Wang <wangguoz@gmail.com> Date: 2016-05-05T19:03:30Z follow-up on KAFKA-725
          Hide
          Stig Rohde Døssing added a comment -

          Thanks for fixing this. The scenario Jun describes is probably a better match for the times we saw the exception originally. We're using Storm's storm-kafka component to consume, and it shouldn't go beyond the HW if the HW never moves backwards. It seems plausible that the logs coincided with leader failover for us.

          Show
          Stig Rohde Døssing added a comment - Thanks for fixing this. The scenario Jun describes is probably a better match for the times we saw the exception originally. We're using Storm's storm-kafka component to consume, and it shouldn't go beyond the HW if the HW never moves backwards. It seems plausible that the logs coincided with leader failover for us.
          Hide
          ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/kafka/pull/1327

          Show
          ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/1327
          Hide
          Ismael Juma added a comment -

          Guozhang Wang's PR was merged to trunk and 0.10.

          Show
          Ismael Juma added a comment - Guozhang Wang 's PR was merged to trunk and 0.10.

            People

            • Assignee:
              Stig Rohde Døssing
              Reporter:
              Chris Riccomini
              Reviewer:
              Guozhang Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              16 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development