Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-1718

"Message Size Too Large" error when only small messages produced with Snappy

    Details

    • Type: Bug
    • Status: Open
    • Priority: Critical
    • Resolution: Unresolved
    • Affects Version/s: 0.8.1.1
    • Fix Version/s: None
    • Component/s: core
    • Labels:
      None

      Description

      I'm the primary author of the Go bindings, and while I originally received this as a bug against my bindings, I'm coming to the conclusion that it's a bug in the broker somehow.

      Specifically, take a look at the last two kafka packets in the following packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you will need a trunk build of Wireshark to fully decode the kafka part of the packets).

      The produce request contains two partitions on one topic. Each partition has one message set (sizes 977205 bytes and 967362 bytes respectively). Each message set is a sequential collection of snappy-compressed messages, each message of size 46899. When uncompressed, each message contains a message set of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.

      However, the broker responds to this with a MessageSizeTooLarge error, full stacktrace from the broker logs being:
      kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes which exceeds the maximum configured message size of 1000012.
      at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
      at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
      at scala.collection.Iterator$class.foreach(Iterator.scala:727)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
      at kafka.log.Log.append(Log.scala:265)
      at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
      at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
      at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
      at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
      at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
      at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
      at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
      at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
      at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
      at scala.collection.AbstractTraversable.map(Traversable.scala:105)
      at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
      at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
      at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
      at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
      at java.lang.Thread.run(Thread.java:695)

      Since as far as I can tell none of the sizes in the actual produced packet exceed the defined maximum, I can only assume that the broker is miscalculating something somewhere and throwing the exception improperly.

      This issue can be reliably reproduced using an out-of-the-box binary download of 0.8.1.1 and the following gist: https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use the `producer-ng` branch of the Sarama library).

      I am happy to provide any more information you might need, or to do relevant experiments etc.

        Activity

        Hide
        sriharsha Sriharsha Chintalapani added a comment -

        Evan Huus kafka uses ByteBufferMessageSet
        https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
        which adds to each message a LogOverhead https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L54
        and in Log compares ByteBufferMessageSet.sizeInBytes with configured message size which won't be equal to actual message size sent.

        Show
        sriharsha Sriharsha Chintalapani added a comment - Evan Huus kafka uses ByteBufferMessageSet https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala which adds to each message a LogOverhead https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala#L54 and in Log compares ByteBufferMessageSet.sizeInBytes with configured message size which won't be equal to actual message size sent.
        Hide
        eapache Evan Huus added a comment -

        LogOverhead is only 12 bytes; none of the values I produce are within 12 bytes of the limit and nowhere near the 1070127 that the broker is reporting.

        Show
        eapache Evan Huus added a comment - LogOverhead is only 12 bytes; none of the values I produce are within 12 bytes of the limit and nowhere near the 1070127 that the broker is reporting.
        Show
        sriharsha Sriharsha Chintalapani added a comment - Evan Huus there is additional data being added per message too https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/Message.scala#L172 and https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/message/Message.scala#L99
        Hide
        eapache Evan Huus added a comment - - edited

        That additional data is only 26 bytes, and is already included in the numbers I put in my original report.

        Show
        eapache Evan Huus added a comment - - edited That additional data is only 26 bytes, and is already included in the numbers I put in my original report.
        Hide
        junrao Jun Rao added a comment -

        Another thing that we do on the broker is to assign a new offset to each (uncompressed) message and recompress those messages again. It may be possible that because of the newly assigned offsets, the new message set doesn't compress as well as before and thus exceeds the limit.

        Show
        junrao Jun Rao added a comment - Another thing that we do on the broker is to assign a new offset to each (uncompressed) message and recompress those messages again. It may be possible that because of the newly assigned offsets, the new message set doesn't compress as well as before and thus exceeds the limit.
        Hide
        eapache Evan Huus added a comment - - edited

        That sounds plausible.

        1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece of backtrace or breakpoint I can check or something?
        2. If that is the problem, what is a client supposed to do about it? Leave a few KiB spare and hope that that's enough? Is there no way for a client using compression to be sure that the broker will actually accept the payload (unless presumably the uncompressed payload is already small enough)?

        Edit: actually, that can't be it. From my original report "When uncompressed, each message contains a message set of 999600 bytes". So unless the recompression on the broker's end added a substantial amount of data (which is improbable; the messages were all 0s)...

        Show
        eapache Evan Huus added a comment - - edited That sounds plausible. 1. How do I verify if that is/isn't the problem I'm seeing? Is there some piece of backtrace or breakpoint I can check or something? 2. If that is the problem, what is a client supposed to do about it? Leave a few KiB spare and hope that that's enough? Is there no way for a client using compression to be sure that the broker will actually accept the payload (unless presumably the uncompressed payload is already small enough)? Edit: actually, that can't be it. From my original report "When uncompressed, each message contains a message set of 999600 bytes". So unless the recompression on the broker's end added a substantial amount of data (which is improbable; the messages were all 0s)...
        Hide
        junrao Jun Rao added a comment -

        In trunk, we actually do a size check before and after recompression. You can probably set a breakpoint in Log.append() and see where the size limit is violated.

        Show
        junrao Jun Rao added a comment - In trunk, we actually do a size check before and after recompression. You can probably set a breakpoint in Log.append() and see where the size limit is violated.
        Hide
        noxis Wojciech Kuranowski added a comment - - edited

        I have compiled Kafka 0.8.2 with different error messages for validation and revalidation and it seems that this issue is triggered in revalidation after recompression. In my case:
        "kafka.common.MessageSizeTooLargeException: revalidate - Message size is 3382345 bytes which exceeds the maximum configured message size of 1000000."

        It's strange that message after recompression is 3 times bigger than the limit. Is broker miscalculating something?

        Show
        noxis Wojciech Kuranowski added a comment - - edited I have compiled Kafka 0.8.2 with different error messages for validation and revalidation and it seems that this issue is triggered in revalidation after recompression. In my case: "kafka.common.MessageSizeTooLargeException: revalidate - Message size is 3382345 bytes which exceeds the maximum configured message size of 1000000." It's strange that message after recompression is 3 times bigger than the limit. Is broker miscalculating something?
        Hide
        sriharsha Sriharsha Chintalapani added a comment -

        I ran sizeInBytes for "test".getBytes
        "test".getBytes size 4
        "test" message(Message.scala) size 18
        "test" ByteBufferedMessageSet size 30

        Per each message there is additional data being added.

        • 1. 4 byte CRC32 of the message
        • 2. 1 byte "magic" identifier to allow format changes, value is 2 currently
        • 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
        • 4. 4 byte key length, containing length K
        • 5. K byte key
        • 6. 4 byte payload length, containing length V
        • 7. V byte payload

        for the message "test" with key being null
        the size comes to 18 and if you add this message to ByteBufferMessageSet it will be 30 (12 being the LogOverHead)

        Show
        sriharsha Sriharsha Chintalapani added a comment - I ran sizeInBytes for "test".getBytes "test".getBytes size 4 "test" message(Message.scala) size 18 "test" ByteBufferedMessageSet size 30 Per each message there is additional data being added. 1. 4 byte CRC32 of the message 2. 1 byte "magic" identifier to allow format changes, value is 2 currently 3. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used) 4. 4 byte key length, containing length K 5. K byte key 6. 4 byte payload length, containing length V 7. V byte payload for the message "test" with key being null the size comes to 18 and if you add this message to ByteBufferMessageSet it will be 30 (12 being the LogOverHead)
        Hide
        sriharsha Sriharsha Chintalapani added a comment -

        Wojciech Kuranowski can you share your test please.

        Show
        sriharsha Sriharsha Chintalapani added a comment - Wojciech Kuranowski can you share your test please.
        Hide
        noxis Wojciech Kuranowski added a comment -

        My test is almost the same as in Description of this issue.

        But I have some interesting things to show.
        I print text "msg-before:" and MessageSet.entrySize(m) for every message in "analyzeAndValidateMessageSet" method.
        And then I print "msg-after:" and MessageSet.entrySize(m) for every message in "validMessages" collection in "append" method.

        As you can see, there are 4 messages "before" and just one "after". It looks like Broker decompress everything, squash into one message and then calculate size. I am not a Kafka expert, so this is my guessing.

        [2014-10-21 20:06:45,299] INFO msg-before: 452610 (kafka.log.Log)
        [2014-10-21 20:06:45,331] INFO msg-before: 450961 (kafka.log.Log)
        [2014-10-21 20:06:45,337] INFO msg-before: 449214 (kafka.log.Log)
        [2014-10-21 20:06:45,338] INFO msg-before: 384778 (kafka.log.Log)
        [2014-10-21 20:06:45,573] INFO msg-after: 3377212 (kafka.log.Log)
        [2014-10-21 20:06:45,577] ERROR [KafkaApi-1] Error processing ProducerRequest with correlation id 0 from client kafka-log-producer on partition [adstream2,0] (kafka.server.KafkaApis)
        kafka.common.MessageSizeTooLargeException: revalidate - Message size is 3377212 bytes which exceeds the maximum configured message size of 1000012.

        [2014-10-21 20:06:46,285] INFO msg-before: 464793 (kafka.log.Log)
        [2014-10-21 20:06:46,286] INFO msg-before: 465262 (kafka.log.Log)
        [2014-10-21 20:06:46,286] INFO msg-before: 465959 (kafka.log.Log)
        [2014-10-21 20:06:46,287] INFO msg-before: 380474 (kafka.log.Log)
        [2014-10-21 20:06:46,364] INFO msg-after: 3383301 (kafka.log.Log)
        [2014-10-21 20:06:46,365] ERROR [KafkaApi-1] Error processing ProducerRequest with correlation id 0 from client kafka-log-producer on partition [adstream2,0] (kafka.server.KafkaApis)
        kafka.common.MessageSizeTooLargeException: revalidate - Message size is 3383301 bytes which exceeds the maximum configured message size of 1000012.

        Show
        noxis Wojciech Kuranowski added a comment - My test is almost the same as in Description of this issue. But I have some interesting things to show. I print text "msg-before:" and MessageSet.entrySize(m) for every message in "analyzeAndValidateMessageSet" method. And then I print "msg-after:" and MessageSet.entrySize(m) for every message in "validMessages" collection in "append" method. As you can see, there are 4 messages "before" and just one "after". It looks like Broker decompress everything, squash into one message and then calculate size. I am not a Kafka expert, so this is my guessing. [2014-10-21 20:06:45,299] INFO msg-before: 452610 (kafka.log.Log) [2014-10-21 20:06:45,331] INFO msg-before: 450961 (kafka.log.Log) [2014-10-21 20:06:45,337] INFO msg-before: 449214 (kafka.log.Log) [2014-10-21 20:06:45,338] INFO msg-before: 384778 (kafka.log.Log) [2014-10-21 20:06:45,573] INFO msg-after: 3377212 (kafka.log.Log) [2014-10-21 20:06:45,577] ERROR [KafkaApi-1] Error processing ProducerRequest with correlation id 0 from client kafka-log-producer on partition [adstream2,0] (kafka.server.KafkaApis) kafka.common.MessageSizeTooLargeException: revalidate - Message size is 3377212 bytes which exceeds the maximum configured message size of 1000012. [2014-10-21 20:06:46,285] INFO msg-before: 464793 (kafka.log.Log) [2014-10-21 20:06:46,286] INFO msg-before: 465262 (kafka.log.Log) [2014-10-21 20:06:46,286] INFO msg-before: 465959 (kafka.log.Log) [2014-10-21 20:06:46,287] INFO msg-before: 380474 (kafka.log.Log) [2014-10-21 20:06:46,364] INFO msg-after: 3383301 (kafka.log.Log) [2014-10-21 20:06:46,365] ERROR [KafkaApi-1] Error processing ProducerRequest with correlation id 0 from client kafka-log-producer on partition [adstream2,0] (kafka.server.KafkaApis) kafka.common.MessageSizeTooLargeException: revalidate - Message size is 3383301 bytes which exceeds the maximum configured message size of 1000012.
        Hide
        junrao Jun Rao added a comment -

        Yes, the broker does recompress all messages in the messageSet passed to Log.append together into a single compressed message. In the java/scala producer, it'a always the case that a messageSet for a partition in a produce request always contains a single compressed message. I guess your go producer can send multiple compressed messages for a single partition. Is there any benefit in doing that?

        Show
        junrao Jun Rao added a comment - Yes, the broker does recompress all messages in the messageSet passed to Log.append together into a single compressed message. In the java/scala producer, it'a always the case that a messageSet for a partition in a produce request always contains a single compressed message. I guess your go producer can send multiple compressed messages for a single partition. Is there any benefit in doing that?
        Hide
        eapache Evan Huus added a comment - - edited

        I guess your go producer can send multiple compressed messages for a single partition

        Yes, that's exactly what it's doing. If it collects enough messages for a partition that they would exceed message.max.bytes when compressed together, it batches them and sends each batch as a compressed message in the same messageSet.

        Is there any benefit in doing that?

        More-or-less to get around the limit on message sizes, which I guess doesn't work so well

        A few points on this then:

        • Currently (with default broker settings) you can produce just under 100MiB (socket.request.max.bytes) of messages to the broker uncompressed in a single request, but you can't produce that seem batch of messages in compressed form since the resulting compressed message would almost certainly be larger than 1MB (message.max.bytes). This discrepancy seems odd to me.
        • I understand the desire to limit "real" message sizes to prevent misbehaving producers from causing problems. However, I don't think the limit is particularly useful when applied to the compressed "meta-messages"; why shouldn't they be arbitrarily large, within the limits of socket.request.max.bytes?
        • I don't think the broker should assume there's only one compressed message per message-set; if a message-set contains multiple compressed messages, it should process them one-at-a-time and store each individually, rather than trying to do them all at once.

        Thanks for all your help!

        Edit: If for some reason you decide to keep the current behaviour as-is, please document this in the protocol spec on the wiki, since as far as I can the spec gives no reason to believe that multiple compressed messages will be combined, and that the combined length will be relevant.

        Show
        eapache Evan Huus added a comment - - edited I guess your go producer can send multiple compressed messages for a single partition Yes, that's exactly what it's doing. If it collects enough messages for a partition that they would exceed message.max.bytes when compressed together, it batches them and sends each batch as a compressed message in the same messageSet. Is there any benefit in doing that ? More-or-less to get around the limit on message sizes, which I guess doesn't work so well A few points on this then: Currently (with default broker settings) you can produce just under 100MiB (socket.request.max.bytes) of messages to the broker uncompressed in a single request, but you can't produce that seem batch of messages in compressed form since the resulting compressed message would almost certainly be larger than 1MB (message.max.bytes). This discrepancy seems odd to me. I understand the desire to limit "real" message sizes to prevent misbehaving producers from causing problems. However, I don't think the limit is particularly useful when applied to the compressed "meta-messages"; why shouldn't they be arbitrarily large, within the limits of socket.request.max.bytes ? I don't think the broker should assume there's only one compressed message per message-set; if a message-set contains multiple compressed messages, it should process them one-at-a-time and store each individually, rather than trying to do them all at once. Thanks for all your help! Edit: If for some reason you decide to keep the current behaviour as-is, please document this in the protocol spec on the wiki, since as far as I can the spec gives no reason to believe that multiple compressed messages will be combined, and that the combined length will be relevant.
        Hide
        guozhang Guozhang Wang added a comment -

        Since the reason we add the "max.message.size" restrict on the broker side is for consumer's fetch size, if we can change the behavior in the new consumer such that when it gets a partial message from the broker it will dynamically increase its fetch size then we can remove this config in both the broker and the new producer. Jun Rao is there any blockers for doing that?

        Show
        guozhang Guozhang Wang added a comment - Since the reason we add the "max.message.size" restrict on the broker side is for consumer's fetch size, if we can change the behavior in the new consumer such that when it gets a partial message from the broker it will dynamically increase its fetch size then we can remove this config in both the broker and the new producer. Jun Rao is there any blockers for doing that?
        Hide
        eapache Evan Huus added a comment - - edited

        when it gets a partial message from the broker it will dynamically increase its fetch size

        like https://github.com/Shopify/sarama/blob/master/consumer.go#L236-L253 ?

        Show
        eapache Evan Huus added a comment - - edited when it gets a partial message from the broker it will dynamically increase its fetch size like https://github.com/Shopify/sarama/blob/master/consumer.go#L236-L253 ?
        Hide
        guozhang Guozhang Wang added a comment -

        Yes, that is what I was thinking about.

        Show
        guozhang Guozhang Wang added a comment - Yes, that is what I was thinking about.
        Hide
        junrao Jun Rao added a comment -

        Guozhang Wang, removing the max message size may be a bigger change. We no only have to patch both the regular and follower consumer, but probably also log compaction, tools that read the logs directly. Also, having a max message size can be a good thing since it limits the memory consumption in the reader.

        As for this issue, we can change the behavior on the broker. However, it's bit tricky since currently, we don't have the api to create a ByteBufferMessageSet with more than 1 already compressed message. So, for now, we can probably just document the behavior in the wiki.

        Evan,

        If you want to help make the wiki change, I can give you permission. Just let me know your wiki id. Thanks,

        Show
        junrao Jun Rao added a comment - Guozhang Wang , removing the max message size may be a bigger change. We no only have to patch both the regular and follower consumer, but probably also log compaction, tools that read the logs directly. Also, having a max message size can be a good thing since it limits the memory consumption in the reader. As for this issue, we can change the behavior on the broker. However, it's bit tricky since currently, we don't have the api to create a ByteBufferMessageSet with more than 1 already compressed message. So, for now, we can probably just document the behavior in the wiki. Evan, If you want to help make the wiki change, I can give you permission. Just let me know your wiki id. Thanks,
        Hide
        eapache Evan Huus added a comment -

        Jun Rao I already have wiki permissions, so I made the relevant change. While I'm in the neighbourhood, what is the expected value of the MagicByte field? The spec doesn't clarify, and my library has been leaving it at 0 without problems thus far, but Sriharsha Chintalapani mentioned earlier that the value should be 2?

        Show
        eapache Evan Huus added a comment - Jun Rao I already have wiki permissions, so I made the relevant change. While I'm in the neighbourhood, what is the expected value of the MagicByte field? The spec doesn't clarify, and my library has been leaving it at 0 without problems thus far, but Sriharsha Chintalapani mentioned earlier that the value should be 2?
        Hide
        junrao Jun Rao added a comment -

        Evan,

        Thanks for updating the wiki. The MagicByte currently is expected to be only 0. We don't validate it on the broker at the moment though. However, we will be if we evolve the message format in the future.

        Show
        junrao Jun Rao added a comment - Evan, Thanks for updating the wiki. The MagicByte currently is expected to be only 0. We don't validate it on the broker at the moment though. However, we will be if we evolve the message format in the future.
        Hide
        jkreps Jay Kreps added a comment -

        Jun Rao, Guozhang Wang is this still ongoing?

        Show
        jkreps Jay Kreps added a comment - Jun Rao , Guozhang Wang is this still ongoing?
        Hide
        guozhang Guozhang Wang added a comment -

        Jay Kreps I think the conclusion is that this is a valid scenario for message size limit and Evan has already updated the wiki page explaining its root causes. We can close this ticket now.

        Show
        guozhang Guozhang Wang added a comment - Jay Kreps I think the conclusion is that this is a valid scenario for message size limit and Evan has already updated the wiki page explaining its root causes. We can close this ticket now.
        Hide
        eapache Evan Huus added a comment -

        Guozhang Wang, Jay Kreps my understanding is that while this is a known limitation of the current design (and the wiki now reflects that limitation), this ticket is still open to track support for multiple compressed message-sets in a single produce request. The points I made in my comment on Oct 21st still stand.

        I'm not sure if there's been any progress in the actual implementation of that support.

        Show
        eapache Evan Huus added a comment - Guozhang Wang , Jay Kreps my understanding is that while this is a known limitation of the current design (and the wiki now reflects that limitation), this ticket is still open to track support for multiple compressed message-sets in a single produce request. The points I made in my comment on Oct 21st still stand. I'm not sure if there's been any progress in the actual implementation of that support.
        Hide
        swapneshgandhi@gmail.com Swapnesh Gandhi added a comment -

        I am seeing the same issue,
        is this compressed messageSet, split into small messages at some point? if so, when?
        does this impact replication in any way? I am thinking performance-wise.

        If I bump up the max.message.size to solve this problem, will that impact performance? if I am sure I have many small messages rather than a single large message.

        Show
        swapneshgandhi@gmail.com Swapnesh Gandhi added a comment - I am seeing the same issue, is this compressed messageSet, split into small messages at some point? if so, when? does this impact replication in any way? I am thinking performance-wise. If I bump up the max.message.size to solve this problem, will that impact performance? if I am sure I have many small messages rather than a single large message.

          People

          • Assignee:
            Unassigned
            Reporter:
            eapache Evan Huus
          • Votes:
            3 Vote for this issue
            Watchers:
            13 Start watching this issue

            Dates

            • Created:
              Updated:

              Development