Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-3627

New consumer doesn't run delayed tasks while under load



    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • consumer
    • None


      If the new consumer receives a steady flow of fetch responses it will not run delayed tasks, which means it will not heartbeat or perform automatic offset commits.

      The main cause is the code that attempts to pipeline fetch responses and keep the consumer fed. Specifically, in KafkaConsumer::pollOnce() there is a check that skips calling client.poll() if there are fetched records ready (line 903 in the 0.9.0 branch of this writing). Then in KafkaConsumer::poll(), if records are returned it will initiate another fetch and perform a quick poll, which will send/receive fetch requests/responses but will not run delayed tasks.

      If the timing works out, and the consumer is consistently receiving fetched records, it won't run delayed tasks until it doesn't receive a fetch response during its quick poll. That leads to a rebalance since the consumer isn't heartbeating, and typically means all the consumed records will be re-delivered since the automatic offset commit wasn't able to run either.

      Steps to reproduce
      1. Start up a cluster with at least 2 brokers. This seems to be required to reproduce the issue, I'm guessing because the fetch responses all arrive together when using a single broker.
      2. Create a topic with a good number of partitions
        • bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic delayed-task-bug --partitions 10 --replication-factor 1

      3. Generate some test data so the consumer has plenty to consume. In this case I'm just using uuids
        • for ((i=0;i<100;++i)) do; cat /proc/sys/kernel/random/uuid >> /tmp/test-messages; done

        • bin/kafka-console-producer.sh --broker-list localhost:9092 --topic delayed-task-bug < /tmp/test-messages

      4. Start up a consumer with a small max fetch size to ensure it only pulls a few records at a time. The consumer can simply sleep for a moment when it receives a record.
        • I'll attach an example in Java
      5. There's a timing aspect to this issue so it may take a few attempts to reproduce


        1. DelayedTaskBugConsumer.java
          4 kB
          Rob Underwood
        2. kafka-3627-output.log
          60 kB
          Rob Underwood



            hachikuji Jason Gustafson
            robu Rob Underwood
            1 Vote for this issue
            6 Start watching this issue