Kafka
  1. Kafka
  2. KAFKA-725

Broker Exception: Attempt to read with a maximum offset less than start offset

    Details

    • Type: Bug Bug
    • Status: Open
    • Priority: Major Major
    • Resolution: Unresolved
    • Affects Version/s: 0.8.0
    • Fix Version/s: None
    • Component/s: log
    • Labels:
      None

      Description

      I have a simple consumer that's reading from a single topic/partition pair. Running it seems to trigger these messages on the broker periodically:

      2013/01/22 23:04:54.936 ERROR [KafkaApis] [kafka-request-handler-4] [kafka] [] [KafkaApi-466] error when processing request (MyTopic,4,7951732,2097152)
      java.lang.IllegalArgumentException: Attempt to read with a maximum offset (7951715) less than the start offset (7951732).
      at kafka.log.LogSegment.read(LogSegment.scala:105)
      at kafka.log.Log.read(Log.scala:390)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:372)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:330)
      at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:326)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
      at scala.collection.immutable.Map$Map1.map(Map.scala:93)
      at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:326)
      at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:165)
      at kafka.server.KafkaApis$$anonfun$maybeUnblockDelayedFetchRequests$2.apply(KafkaApis.scala:164)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
      at kafka.server.KafkaApis.maybeUnblockDelayedFetchRequests(KafkaApis.scala:164)
      at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:186)
      at kafka.server.KafkaApis$$anonfun$handleProducerRequest$2.apply(KafkaApis.scala:185)
      at scala.collection.immutable.Map$Map2.foreach(Map.scala:127)
      at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:185)
      at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
      at java.lang.Thread.run(Thread.java:619)

      When I shut the consumer down, I don't see the exceptions anymore.

      This is the code that my consumer is running:
      while(true) {
      // we believe the consumer to be connected, so try and use it for a fetch request
      val request = new FetchRequestBuilder()
      .addFetch(topic, partition, nextOffset, fetchSize)
      .maxWait(Int.MaxValue)
      // TODO for super high-throughput, might be worth waiting for more bytes
      .minBytes(1)
      .build

      debug("Fetching messages for stream %s and offset %s." format (streamPartition, nextOffset))
      val messages = connectedConsumer.fetch(request)
      debug("Fetch complete for stream %s and offset %s. Got messages: %s" format (streamPartition, nextOffset, messages))
      if (messages.hasError)

      { warn("Got error code from broker for %s: %s. Shutting down consumer to trigger a reconnect." format (streamPartition, messages.errorCode(topic, partition))) ErrorMapping.maybeThrowException(messages.errorCode(topic, partition)) }

      messages.messageSet(topic, partition).foreach(msg =>

      { watchers.foreach(_.onMessagesReady(msg.offset.toString, msg.message.payload)) nextOffset = msg.nextOffset }

      )
      }

      Any idea what might be causing this error?

        Issue Links

          Activity

          Hide
          Neha Narkhede added a comment -

          This looks exactly like KAFKA-698. Chris, did you try with a Kafka cluster that includes a fix for KAFKA-698 ?

          Show
          Neha Narkhede added a comment - This looks exactly like KAFKA-698 . Chris, did you try with a Kafka cluster that includes a fix for KAFKA-698 ?

            People

            • Assignee:
              Jay Kreps
              Reporter:
              Chris Riccomini
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:

                Development