Kafka
  1. Kafka
  2. KAFKA-732

MirrorMaker with shallow.iterator.enable=true produces unreadble messages

    Details

    • Type: Bug Bug
    • Status: Closed
    • Priority: Blocker Blocker
    • Resolution: Fixed
    • Affects Version/s: 0.8.0, 0.8.1
    • Fix Version/s: 0.8.0
    • Component/s: core, producer
    • Labels:
      None

      Description

      Trying to use MirrorMaker between two 0.8 clusters

      When using shallow.iterator.enable=true on the consumer side, the performance gain is big (when incoming messages are compressed) and the producer does not complain but write the messages uncompressed without the compression flag.

      If you try:

      • enable compression on the producer, it obviously makes things worse since the data get double-compressed (the wiki warns about this)
      • disable compression and the compressed messages are written in bulk in an uncompressed message, thus making it unreadable.

      If I follow correctly the current state of code from MirrorMaker to the produce request, there is no way for the producer to know whether the message is deep or not. So I wonder how it worked on 0.7?

      Here is the code as i read it (correct me if i'm wrong):

      1. MirrorMakerThread.run(): create KeyedMessage[Array[Byte],Array[Byte]](topic, message)
      2. Producer.send() -> DefaultEventHandler.handle()
      3. DefaultEventHandler.serialize(): use DefaultEncoder for the message (does nothing)
      4. DefaultEventHandler.dispatchSerializedData():
      4.1 DefaultEventHandler.partitionAndCollate(): group messages by broker/partition/topic
      4.2 DefaultEventHandler.dispatchSerializeData(): cycle through each broker
      4.3 DefaultEventHandler.groupMessagesToSet(): Create a ByteBufferMessageSet for each partition/topic grouping all the messages together, and compressing them if needed
      4.4 DefaultEventHandler.send(): send the ByteBufferMessageSets for this broker in one ProduceRequest

      The gist is that in DEH.groupMessagesToSet(), you don't know wether the raw message in KeyedMessage.message is shallow or not. So I think I missed something... Also it doesn't seem possible to send batch of deep messages in one ProduceRequest.

      I would love to provide a patch (or if you tell me that i'm doing it wrong, it's even better), since I can easily test it on my test clusters but I will need guidance here.

        Issue Links

          Activity

          Hide
          Jun Rao added a comment -

          Thanks for the review. Committed to 0.8.

          Show
          Jun Rao added a comment - Thanks for the review. Committed to 0.8.
          Hide
          Neha Narkhede added a comment -

          +1

          Show
          Neha Narkhede added a comment - +1
          Hide
          Jun Rao added a comment -

          Attach a patch that removes the shallow iteration option in the consumer.

          Show
          Jun Rao added a comment - Attach a patch that removes the shallow iteration option in the consumer.
          Hide
          Neha Narkhede added a comment -

          Moving this out of 0.8 as per discussion on mailing list

          Show
          Neha Narkhede added a comment - Moving this out of 0.8 as per discussion on mailing list
          Hide
          Jun Rao added a comment -

          I seems that it's non-trivial to support shallow iteration in 0.8. The main reason is that the encoder api is changed to encode(event: T) => byte[], from encode(event: T) => Message. So, in 0.7, we can get a compressed message from the source and simply pass it to the producer in mirrorMaker. In 0.8, there is no easy way that we can do that.

          So, I suggest that we remove the shallowIteration option in ConsumerConfig and revisit this issue post 0.8.

          Show
          Jun Rao added a comment - I seems that it's non-trivial to support shallow iteration in 0.8. The main reason is that the encoder api is changed to encode(event: T) => byte[], from encode(event: T) => Message. So, in 0.7, we can get a compressed message from the source and simply pass it to the producer in mirrorMaker. In 0.8, there is no easy way that we can do that. So, I suggest that we remove the shallowIteration option in ConsumerConfig and revisit this issue post 0.8.

            People

            • Assignee:
              Jun Rao
              Reporter:
              Maxime Brugidou
            • Votes:
              3 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development