Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-6088

Kafka Consumer slows down when reading from highly compacted topics

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 0.10.2.1
    • Fix Version/s: 0.11.0.0
    • Component/s: clients
    • Labels:
      None

      Description

      Summary of the issue


      We found a performance issue with the Kafka Consumer where it gets less efficient if you have frequent gaps in offsets (which happens when there is lots of compaction on the topic).

      The issue is present in 0.10.2.1 and possibly prior.

      It is fixed in 0.11.0.0.

      Summary of cause


      The fetcher code assumes that there will be no gaps in message offsets. If there are, it does an additional round trip to the broker. For topics with large gaps in offsets, it is possible that most calls to poll() will generate a roundtrip to the broker.

      Background and details


      We have a topic with roughly 8 million records. The topic is log compacted. It turns out that most of the initial records in the topic were never overwritten, whereas in the 2nd half of the topic we had lots of overwritten records. That means that for the first part of the topic, there are no gaps in offsets. But in the 2nd part of the topic, there are frequent gaps in the offsets (due to records being compacted away).

      We have a consumer that starts up and reads the entire topic from beginning to end. We noticed that the consumer would read through the first part of the topic very quickly. When it got to the part of the topic with frequent gaps in offsets, consumption rate slowed down dramatically. This slowdown was consistent across multiple runs.

      What is happening is this:
      1) A call to poll() happens. The consumer goes to the broker and returns 1MB of data (the default of max.partition.fetch.bytes). It then returns to the caller just 500 records (the default of max.poll.records), and keeps the rest of the data in memory to use in future calls to poll().
      2) Before returning the 500 records, the consumer library records the next offset it should return. It does so by taking the offset of the last record, and adds 1 to it. (The offset of the 500th message from the set, plus 1). It calls this the nextOffset
      3) The application finishes processing the 500 messages, and makes another call to poll() happens. During this call, the consumer library does a sanity check. It checks that the first message of the set it is about to return has an offset that matches the value of nextOffset. That is it checks if the 501th record has an offset that is 1 greater than the 500th record.
      a. If it matches, then it returns an additional 500 records, and increments the nextOffset to (offset of the 1000th record, plus 1)
      b. If it doesn't match, then it throws away the remainder of the 1MB of data that it stored in memory in step 1, and it goes back to the broker to fetch an additional 1MB of data, starting at the offset nextOffset.

      In topics have no gaps (a non-compacted topic), then the code will always hit the 3a code path.
      If the topic has gaps in offsets and the call to poll() happens to fall onto a gap, then the code will hit code path 3b.

      If the gaps are frequent, then it will frequently hit code path 3b.

      The worst case scenario that can happen is if you have a large number of gaps, and you run with max.poll.records=1. Every gap will result in a new fetch to the broker. You may possibly end up only processing one message per fetch. Or, said another way, you will end up doing a single fetch for every single message in the partition.

      Repro


      We created a repro. It appears that the bug is in 0.10.2.1, but was fixed in 0.11. I've attached the tarball with all the code and instructions.

      The repro is:
      1) Create a single partition topic with log compaction turned on
      2) Write messages with the following keys: 1 1 2 2 3 3 4 4 5 5 ... (each message key written twice in a row)
      3) Let compaction happen. This would mean that that offsets 0 2 4 6 8 10 ... would be compacted away
      4) Consume from this topic with max.poll.records=1

      More concretely,

      Here is the producer code:

      Producer<String, String> producer = new KafkaProducer<String, String>(props); 
      for (int i = 0; i < 1000000; i++) { 
          producer.send(new ProducerRecord<String, String>("compacted", Integer.toString(i), Integer.toString(i))); 
          producer.send(new ProducerRecord<String, String>("compacted", Integer.toString(i), Integer.toString(i))); 
      } 
      producer.flush(); 
      producer.close();
      

      When consuming with a 0.10.2.1 consumer, you can see this pattern (with Fetcher logs at DEBUG, see file consumer_0.10.2/debug.log):

      offset = 1, key = 0, value = 0 
      22:58:51.262 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 3 since the current position is 2 
      22:58:51.263 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
      offset = 3, key = 1, value = 1 
      22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 5 since the current position is 4 
      22:58:51.299 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
      offset = 5, key = 2, value = 2 
      22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 7 since the current position is 6 
      22:58:51.337 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
      offset = 7, key = 3, value = 3 
      22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 9 since the current position is 8 
      22:58:51.361 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
      offset = 9, key = 4, value = 4 
      22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 11 since the current position is 10 
      22:58:51.382 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
      offset = 11, key = 5, value = 5 
      22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 13 since the current position is 12 
      22:58:51.404 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null) 
      offset = 13, key = 6, value = 6 
      22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Ignoring fetched records for compacted-0 at offset 15 since the current position is 14 
      22:58:51.424 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Sending fetch for partitions [compacted-0] to broker 192.168.0.105:9092 (id: 0 rack: null)
      

      When consuming with a 0.11.0.1 consumer ,you can see the following pattern: (see file consumer_0.11/debug.log):

      offset = 1, key = 0, value = 0 
      offset = 3, key = 1, value = 1 
      offset = 5, key = 2, value = 2 
      offset = 7, key = 3, value = 3 
      offset = 9, key = 4, value = 4 
      offset = 11, key = 5, value = 5 
      offset = 13, key = 6, value = 6 
      offset = 15, key = 7, value = 7 
      offset = 17, key = 8, value = 8 
      offset = 19, key = 9, value = 9 
      offset = 21, key = 10, value = 10 
      

      From looking at the github history, it appears it was fixed in https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0

      Specifically, this line
      https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0L930

      Was replaced by this line:
      https://github.com/apache/kafka/commit/a0b8e435c9419a9402d08408260bea0c1d95cff0#diff-b45245913eaae46aa847d2615d62cde0R933

      Mitigation


      This problem is fixed in 0.11.0.0. If you can upgrade to 0.11.0.0, then you will not be affected by the problem.

      If you cannot upgrade to 0.11.0.0, then you can reduce the impact of this by increasing the value of max.poll.records. This works because check happens on each call to poll(), and increasing the value of max.poll.records will reduce the number of calls to poll().

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              wushujames James Cheng
              Reviewer:
              Apurva Mehta
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: