Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2625

KafkaSpout is not calculating uncommitted correctly

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Minor
    • Resolution: Duplicate
    • Affects Version/s: 1.1.1
    • Fix Version/s: None
    • Component/s: storm-kafka-client
    • Labels:
      None

      Description

      This happens when:
      1. KafkaSpout has already committed offsets to a topic before, and is not running/activated now;
      2. There're messages in topic after the committed offsets;
      3. The same consumer group topology with multi works is started/activated again;

      The same issue may happen when running topology gets consumer group partition re-assignment with offsets not being able to be committed in time.

      The underlying issue is:

      a. Because workers are registering kafka consumers one by one, when the first consumer A registers itself with kafka broker with the consumer group, it's assigned all the partitions, say partition 0 & 1. Consumer A then retrieves messages from all the assigned partitions if possible, and started processing. With every tuple KafkaSpout A emits, UNCOMMITTED count numUncommittedOffsets++ (KafkaSpout#emitTupleIfNotEmitted());

      b. At this point a second consumer B registers with the broker for the same consumer group. the broker then re-assigns the partitions among existing consumers, say consumer A is assigned partition 0, and consumer B assigned partition 1.

      b.1 At this point KafkaSpout A will try committing acked offsets, and remove the partition 1 offsets it's tracking (KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); However because the tuples are not all acked, KafkaSpout is not able to commit full list of offsets to kafka broker.

      b.2 Then KafkaSpout A will remove tracked partition 1 offsets in offsetManagers as well as emitted (
      org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
      org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()), resulting the not acked tuples won't be acked for ever (org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count numUncommittedOffsets will never be reduced back to a correct result.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                GuangDu Guang Du
              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 1.5h
                  1.5h