Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-732

MirrorMaker with shallow.iterator.enable=true produces unreadble messages

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 0.8.0, 0.8.1
    • 0.8.0
    • core, producer
    • None

    Description

      Trying to use MirrorMaker between two 0.8 clusters

      When using shallow.iterator.enable=true on the consumer side, the performance gain is big (when incoming messages are compressed) and the producer does not complain but write the messages uncompressed without the compression flag.

      If you try:

      • enable compression on the producer, it obviously makes things worse since the data get double-compressed (the wiki warns about this)
      • disable compression and the compressed messages are written in bulk in an uncompressed message, thus making it unreadable.

      If I follow correctly the current state of code from MirrorMaker to the produce request, there is no way for the producer to know whether the message is deep or not. So I wonder how it worked on 0.7?

      Here is the code as i read it (correct me if i'm wrong):

      1. MirrorMakerThread.run(): create KeyedMessage[Array[Byte],Array[Byte]](topic, message)
      2. Producer.send() -> DefaultEventHandler.handle()
      3. DefaultEventHandler.serialize(): use DefaultEncoder for the message (does nothing)
      4. DefaultEventHandler.dispatchSerializedData():
      4.1 DefaultEventHandler.partitionAndCollate(): group messages by broker/partition/topic
      4.2 DefaultEventHandler.dispatchSerializeData(): cycle through each broker
      4.3 DefaultEventHandler.groupMessagesToSet(): Create a ByteBufferMessageSet for each partition/topic grouping all the messages together, and compressing them if needed
      4.4 DefaultEventHandler.send(): send the ByteBufferMessageSets for this broker in one ProduceRequest

      The gist is that in DEH.groupMessagesToSet(), you don't know wether the raw message in KeyedMessage.message is shallow or not. So I think I missed something... Also it doesn't seem possible to send batch of deep messages in one ProduceRequest.

      I would love to provide a patch (or if you tell me that i'm doing it wrong, it's even better), since I can easily test it on my test clusters but I will need guidance here.

      Attachments

        1. kafka-732.patch
          5 kB
          Jun Rao

        Issue Links

          Activity

            People

              junrao Jun Rao
              brugidou Maxime Brugidou
              Votes:
              3 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: