Affects Version/s: 1.1.1
Fix Version/s: None
This happens when:
1. KafkaSpout has already committed offsets to a topic before, and is not running/activated now;
2. There're messages in topic after the committed offsets;
3. The same consumer group topology with multi works is started/activated again;
The same issue may happen when running topology gets consumer group partition re-assignment with offsets not being able to be committed in time.
The underlying issue is:
a. Because workers are registering kafka consumers one by one, when the first consumer A registers itself with kafka broker with the consumer group, it's assigned all the partitions, say partition 0 & 1. Consumer A then retrieves messages from all the assigned partitions if possible, and started processing. With every tuple KafkaSpout A emits, UNCOMMITTED count numUncommittedOffsets++ (KafkaSpout#emitTupleIfNotEmitted());
b. At this point a second consumer B registers with the broker for the same consumer group. the broker then re-assigns the partitions among existing consumers, say consumer A is assigned partition 0, and consumer B assigned partition 1.
b.1 At this point KafkaSpout A will try committing acked offsets, and remove the partition 1 offsets it's tracking (KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); However because the tuples are not all acked, KafkaSpout is not able to commit full list of offsets to kafka broker.
b.2 Then KafkaSpout A will remove tracked partition 1 offsets in offsetManagers as well as emitted (
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()), resulting the not acked tuples won't be acked for ever (org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count numUncommittedOffsets will never be reduced back to a correct result.