Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2542

Deprecate storm-kafka-client KafkaConsumer.subscribe API subscriptions on 1.x and remove them as options in 2.x

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0, 1.1.0
    • 2.0.0
    • storm-kafka-client
    • None

    Description

      Most of this is copied off the mailing list post:

      We've recently seen some issues raised by users using the default subscription API in the new KafkaSpout (https://issues.apache.org/jira/browse/STORM-2514, https://issues.apache.org/jira/browse/STORM-2538).

      A while ago an alternative subscription implementation was added (https://github.com/apache/storm/pull/1835), which uses the KafkaConsumer.assign API instead.

      The subscribe API used by default causes Kafka to assign partitions to available consumers automatically. It allows a consumer group to keep processing even in the presence of crashes because partitions are reassigned when a consumer becomes unavailable.

      The assign API used in the alternative subscription implementation leaves it up to the consuming code to figure out a reasonable partition distribution among a consumer group. The assign API is essentially equivalent to how the old storm-kafka spout distributes partitions across spout instances, and as far as I know it has worked well there.

      Storm already ensures that all spout instances are running, and restarts them if they crash, so we're not really gaining much by using the subscribe API.

      The disadvantages to using the subscribe API are:

      • Whenever an executor crashes, the Kafka cluster reassigns all partitions. This causes all KafkaSpout instances in that consumer group to pause until reassignment is complete.
      • The partition assignment is random, so it is difficult for users to predict which partitions are assigned to which spout task.
      • The subscribe API is extremely likely to cause hangs and other weird behavior if the KafkaSpout is configured to run multiple tasks in an executor. When KafkaConsumer.poll is called during partition reassignment, it will block until the reassignment is complete. If there are multiple consumers in a thread, the first consumer to get called will block, and the other consumer will get ejected from the list of active consumers after a timeout, because it didn't manage to call poll during the rebalance. See the example code in https://issues.apache.org/jira/browse/STORM-2514, which runs two KafkaConsumers in one thread. The result is that they flip flop between being active, and most polls take ~30 seconds (the Kafka session timeout)
      • The random assignment of partitions causes more message duplication than is necessary. When an executor crashes, all the other executors have their partitions reassigned. This makes it likely that some of them will lose a partition they had in-flight tuples on, which they will then be unable to commit to Kafka. The message is then reemitted by whichever KafkaSpout instance was assigned the partition. See https://issues.apache.org/jira/browse/STORM-2538

      I'd like to drop support for the subscribe API, and switch to using the assign API by default.

      The KafkaConsumer Javadoc even mentions applications like Storm as a case where the subscribe API doesn't really add value.

      If the process itself is highly available and will be restarted if it fails (perhaps using a cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process will be restarted on another machine.

      Attachments

        Issue Links

          Activity

            People

              srdo Stig Rohde Døssing
              srdo Stig Rohde Døssing
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 3h
                  3h