Kafka
  1. Kafka
  2. KAFKA-767

Message Size check should be done after assigning the offsets

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0
    • Fix Version/s: 0.8.0
    • Component/s: log
    • Labels:

      Description

      Replica fetcher thread fails with messageSizeTooLarge exception. One theory is that this check is happening before decompress - assign offsets - compress phase. Hence the final compressed size can be different from that obtained from the produce request. This causes replica fetcher thread to be permanently down and prevents the broker from being in sync.

      2013/02/20 02:19:25.447 ERROR [ReplicaFetcherThread] [ReplicaFetcherThread-0-274] [kafka] [] [ReplicaFetcherThread-0-274], Error due to
      kafka.common.MessageSizeTooLargeException: Message size is 1000028 bytes which exceeds the maximum configured message size of 1000000.
      at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:353)
      at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:339)
      at scala.collection.Iterator$class.foreach(Iterator.scala:631)
      at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
      at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:339)
      at kafka.log.Log.append(Log.scala:262)
      at kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:52)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:130)
      at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:113)
      at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
      at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
      at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
      at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:113)
      at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

      1. KAFKA-767-v1.patch
        3 kB
        Sriram Subramanian
      2. KAFKA-767-v2.patch
        4 kB
        Sriram Subramanian

        Activity

        Transition Time In Source Status Execution Times Last Executer Last Execution Date
        Open Open Patch Available Patch Available
        1d 19h 4m 1 Neha Narkhede 22/Feb/13 16:19
        Patch Available Patch Available Resolved Resolved
        6h 43m 1 Neha Narkhede 22/Feb/13 23:03
        Resolved Resolved Closed Closed
        2s 1 Neha Narkhede 22/Feb/13 23:03
        Neha Narkhede made changes -
        Status Resolved [ 5 ] Closed [ 6 ]
        Neha Narkhede made changes -
        Status Patch Available [ 10002 ] Resolved [ 5 ]
        Resolution Fixed [ 1 ]
        Hide
        Neha Narkhede added a comment -

        committed v2

        Show
        Neha Narkhede added a comment - committed v2
        Sriram Subramanian made changes -
        Attachment KAFKA-767-v2.patch [ 12570509 ]
        Hide
        Sriram Subramanian added a comment -

        Neha - fixed
        Jun - I dont see a strong reason to make it a method. It neither seems reusable nor large enough to wrap it in a method.

        Show
        Sriram Subramanian added a comment - Neha - fixed Jun - I dont see a strong reason to make it a method. It neither seems reusable nor large enough to wrap it in a method.
        Neha Narkhede made changes -
        Status Open [ 1 ] Patch Available [ 10002 ]
        Hide
        Neha Narkhede added a comment -

        Thanks for the patch, it looks good. Just one comment, can you break long log4j line ?

        Show
        Neha Narkhede added a comment - Thanks for the patch, it looks good. Just one comment, can you break long log4j line ?
        Hide
        Jun Rao added a comment -

        Thanks for the patch. It looks good. Should we put the size verifying logic in a separate private method?

        Also, the patch doesn't apply to 0.8. Could you rebase?

        Show
        Jun Rao added a comment - Thanks for the patch. It looks good. Should we put the size verifying logic in a separate private method? Also, the patch doesn't apply to 0.8. Could you rebase?
        Hide
        Sriram Subramanian added a comment -

        Changes

        core/src/main/scala/kafka/log/Log.scala | 18 +++++++++++-------
        1 file changed, 11 insertions, 7 deletions

        Show
        Sriram Subramanian added a comment - Changes core/src/main/scala/kafka/log/Log.scala | 18 +++++++++++------- 1 file changed, 11 insertions , 7 deletions
        Sriram Subramanian made changes -
        Attachment KAFKA-767-v1.patch [ 12570440 ]
        Neha Narkhede made changes -
        Component/s log [ 12320320 ]
        Neha Narkhede made changes -
        Labels p1
        Sriram Subramanian made changes -
        Field Original Value New Value
        Priority Major [ 3 ] Blocker [ 1 ]
        Sriram Subramanian created issue -

          People

          • Assignee:
            Sriram Subramanian
            Reporter:
            Sriram Subramanian
          • Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development