Kafka
  1. Kafka
  2. KAFKA-277

Add a shallow iterator to the ByteBufferMessageSet, which is only used in SynchProducer.verifyMessageSize() function

    Details

    • Type: Bug Bug
    • Status: Resolved
    • Priority: Major Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 0.7.1
    • Component/s: None
    • Labels:
      None

      Description

      Shallow iterator just traverse the first level messages of a ByteBufferMessageSet, compressed messages won't be decompressed and treated individually

        Activity

        Hide
        Yang Ye added a comment -

        The lastConnectionTime adjustment is also in this patch.

        Also the following file is also affected. Curious, in the repository copy, the variable "event" has no context (an variable not defined), only when I change it to events the make process can succeed.

        — core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1245727)
        +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy)
        @@ -38,8 +38,8 @@
        processedEvents = cbkHandler.beforeSendingData(events)

        if(logger.isTraceEnabled)

        • processedEvents.foreach(event => trace("Handling event for Topic: %s, Partition: %d"
        • .format(event.getTopic, event.getPartition)))
          + processedEvents.foreach(events => trace("Handling event for Topic: %s, Partition: %d"
          + .format(events.getTopic, events.getPartition)))

        send(serialize(collate(processedEvents), serializer), syncProducer)
        }

        Show
        Yang Ye added a comment - The lastConnectionTime adjustment is also in this patch. Also the following file is also affected. Curious, in the repository copy, the variable "event" has no context (an variable not defined), only when I change it to events the make process can succeed. — core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (revision 1245727) +++ core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (working copy) @@ -38,8 +38,8 @@ processedEvents = cbkHandler.beforeSendingData(events) if(logger.isTraceEnabled) processedEvents.foreach(event => trace("Handling event for Topic: %s, Partition: %d" .format(event.getTopic, event.getPartition))) + processedEvents.foreach(events => trace("Handling event for Topic: %s, Partition: %d" + .format(events.getTopic, events.getPartition))) send(serialize(collate(processedEvents), serializer), syncProducer) }
        Hide
        Jun Rao added a comment -

        Some comments:
        1. The variable event just binds to every item in the sequence by the foreach method. There is no need to rename it since each item in processedEvents is supposed to be a single event.
        2. In ByteBufferMessageSet, instead of duplicating code in shallowIterator, could we rename deepIterator to internalIterator and add a flag to control whether we want to do shallow iteration or deep iteration? In general, we don't want to expose the shallow iterator externally. So, it's better if we just add a verifyMessageSize method in ByteBufferMessageSet that uses shallow iterator.

        Show
        Jun Rao added a comment - Some comments: 1. The variable event just binds to every item in the sequence by the foreach method. There is no need to rename it since each item in processedEvents is supposed to be a single event. 2. In ByteBufferMessageSet, instead of duplicating code in shallowIterator, could we rename deepIterator to internalIterator and add a flag to control whether we want to do shallow iteration or deep iteration? In general, we don't want to expose the shallow iterator externally. So, it's better if we just add a verifyMessageSize method in ByteBufferMessageSet that uses shallow iterator.
        Hide
        Jun Rao added a comment -

        Also, please add a unit test for this. Use a max_message size larger than each individual uncompressed message, but smaller than the compressed message.

        Show
        Jun Rao added a comment - Also, please add a unit test for this. Use a max_message size larger than each individual uncompressed message, but smaller than the compressed message.
        Hide
        Yang Ye added a comment -

        in ByteBufferMessageSet, internal_iterator() is built with one flag to control the deep or shallow behavior. verifyMessageSize() function is moved as an member function.

        Unit test is built as a separate function in SynchProducerTest.scala

        Show
        Yang Ye added a comment - in ByteBufferMessageSet, internal_iterator() is built with one flag to control the deep or shallow behavior. verifyMessageSize() function is moved as an member function. Unit test is built as a separate function in SynchProducerTest.scala
        Hide
        Jun Rao added a comment -

        Thanks for the patch. It looks good. Just committed to trunk.

        Show
        Jun Rao added a comment - Thanks for the patch. It looks good. Just committed to trunk.

          People

          • Assignee:
            Yang Ye
            Reporter:
            Yang Ye
          • Votes:
            0 Vote for this issue
            Watchers:
            0 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development