Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3428

Remove metadata sync bottleneck from mirrormaker's producer

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Patch Available
    • Major
    • Resolution: Unresolved
    • 0.9.0.1
    • None
    • mirrormaker
    • None

    Description

      Due to sync on the single producer, MM in a setup with 32 consumer threads could not send more than
      358k msg/sec hence not being able to saturate the NIC. Profiling showed the producer.send takes 0.080 ms in average, which explains the bottleneck of 358k msg/sec. The following explains the bottleneck in producer.send and suggests how to improve it.

      Current impl of MM relies on a single reducer. For EACH message, the producer.send() calls waitOnMetadata which runs the following synchronized method

              // add topic to metadata topic list if it is not there already.
              if (!this.metadata.containsTopic(topic))
                  this.metadata.add(topic);
      

      Although the code is mostly noop, since containsTopic is synchronized it becomes the bottleneck in MM.

      Profiling highlights this bottleneck:

      100.0% - 65,539 ms kafka.tools.MirrorMaker$MirrorMakerThread.run
        18.9% - 12,403 ms org.apache.kafka.clients.producer.KafkaProducer.send
        13.8% - 9,056 ms org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
        12.1% - 7,933 ms org.apache.kafka.clients.Metadata.containsTopic
        1.7% - 1,088 ms org.apache.kafka.clients.Metadata.fetch
        2.6% - 1,729 ms org.apache.kafka.clients.Metadata.fetch
        2.2% - 1,442 ms org.apache.kafka.clients.producer.internals.RecordAccumulator.append
      

      After replacing this bottleneck with a kind of noop, another run of the profiler shows that fetch is the next bottleneck:

      org.xerial.snappy.SnappyNative.arrayCopy	 132 s (54 %)	n/a	n/a
      	java.lang.Thread.run	 50,776 ms (21 %)	n/a	n/a
      	org.apache.kafka.clients.Metadata.fetch	 20,881 ms (8 %)	n/a	n/a
        6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata
        6.8% - 16,546 ms org.apache.kafka.clients.producer.KafkaProducer.send
        6.8% - 16,546 ms kafka.tools.MirrorMaker$MirrorMakerProducer.send
      

      however the fetch method does not need to be syncronized

          public synchronized Cluster fetch() {
              return this.cluster;
          }
      

      removing sync from the fetch method shows that bottleneck is disappeared:

      org.xerial.snappy.SnappyNative.arrayCopy	 249 s (78 %)	n/a	n/a
      	org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel	 24,489 ms (7 %)	n/a	n/a
      	org.xerial.snappy.SnappyNative.rawUncompress	 17,024 ms (5 %)	n/a	n/a
      	org.apache.kafka.clients.producer.internals.RecordAccumulator.append	 13,817 ms (4 %)	n/a	n/a
        4.3% - 13,817 ms org.apache.kafka.clients.producer.KafkaProducer.send
      

      Internally we have applied a patch to remove this bottleneck. The patch does the following:
      1. replace HashSet with a concurrent hash set
      2. remove sync from containsTopic and fetch
      3. pass a replica of topics to getClusterForCurrentTopics since this synchronized method access topics at two locations and topics being hanged in the middle might mess with the semantics.

      Any interest in applying this patch? Any alternative suggestions?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              maysamyabandeh Maysam Yabandeh
              Ismael Juma Ismael Juma
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: