Uploaded image for project: 'Kafka'
  1. Kafka
  2. KAFKA-2986

Consumer group doesn't lend itself well for slow consumers with varying message size

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 0.9.0.0
    • None
    • consumer
    • None
    • Java consumer API 0.9.0.0

    Description

      I sent a related post to the Kafka mailing list, but haven't received any response: http://mail-archives.apache.org/mod_mbox/kafka-users/201512.mbox/%3CCAL%2BArfWNfkpymkNDuf6UJ06CJJ63XC1bPHeT4TSYXKjSsOpu-Q%40mail.gmail.com%3E So far, I think this is a design issue in Kafka so I'm taking the liberty of creating an issue.

      Use case:

      • Slow consumtion. Maybe around 20 seconds per record.
      • Large variation in message size: Serialized tasks are in the range of ~300 bytes up to ~3 MB.
      • Consumtion latency (20 seconds) is independent of message size.

      Code example:

      while (isRunning()) {
        ConsumerRecords<String, byte[]> records = consumer.poll(100);
        for (final ConsumerRecord<String, byte[]> record : records) {
          // Handle record...
        }
      }
      

      Problem: Kafka doesn't have any issues with large messages (as long as you bump some configuration flags). However, the problem is two-fold:

      • KafkaConsumer#poll is the only call that sends healthchecks.
      • There is no limit as to how many messages KafkaConsumer#poll will return. The limit is only set to the total number of bytes to be prefetched. This is problematic for varying message sizes as the session timeout becomes extremelly hard to tune:
        • delay until next KafkaConsumer#poll call is proportional to the number of records returned by previous KafkaConsumer#poll call.
        • KafkaConsumer#poll will return many small records or just a few larger records. For many small messages the risk is very large of the session timeout to kick in. Raising the session timeout in the order of magnitudes required to handle the smaller messages increases the latency until a dead consumer is discovered a thousand fold.

      Proposed fixes: I do not claim to be a Kafka expert, but two ideas are to either

      • allow add `KafkaConsumer#healthy` call to let the broker know we are still processing records; or
      • add an upper number of message limit to `KafkaConsumer#poll`. I am thinking of something like `KafkaConsumer#poll(timeout, nMaxMessages)`. This could obviously be set a configuration property instead. To avoid the broker having to look at the messages it sends, I suggest the KafkaConsumer decides how many messages it returns from poll.

      Workarounds:

      • Have different topics for different message sizes. Makes tuning of partition prefetch easier.
      • Use another tool

      Questions: Should Kafka be able to handle this case? Maybe I am using the wrong tool for this and Kafka is simply designed for high-throughput/low latency?

      Attachments

        Issue Links

          Activity

            People

              nehanarkhede Neha Narkhede
              ztyx Jens Rantil
              Votes:
              3 Vote for this issue
              Watchers:
              12 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: