Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-994

High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Fixed
    • 0.8.0
    • 0.8.0
    • consumer
    • None

    Description

      The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is:

      // if we just updated the current chunk and it is empty that means the fetch size is too small!
      if(currentDataChunk.messages.validBytes == 0)
      throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
      "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
      .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
      }

      The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue:

      def enqueue(messages: ByteBufferMessageSet) {

      • val size = messages.sizeInBytes
        + val size = messages.validBytes
        if(size > 0) {

      i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator...

      I've attached a patch that passes our tests...

      Attachments

        1. messageSize.patch
          0.7 kB
          Sam Meder

        Activity

          People

            smeder Sam Meder
            smeder Sam Meder
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: