Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.0.5
-
None
Description
A KafkaSpout stops polling messages from topic when topology has no ackers set ("topology.acker.executors"= 0). This happens after KafkaSpout polls (and emits) messages equal to the max uncommitted offsets (KafkaspoutConfig.maxUncommittedOffsets).
This happens because the KafkaSpout.ack() is called even before the emit method adds the message to "emitted" collection (Set). In such cases, the message is not added "acked" collection (Map), and hence never committed. This seem to be bug introduced in a version later than storm-kafka-client 1.0.1 and current code (1.0.5) has a check in ack() that checks if the acked message is available in "emitted" collection and if not, ignores it (never adds it to "acked" collection)
Steps to reproduce issue:
1. Need a topology with KafkaSpout from storm-kafka-client
2. Set number of ackers ("topology.acker.executors") to o (org.apache.storm.Config.setNumAckers(0)
3. While creating KafkaSpout instance, set SpoutConfig.setMaxUncommittedOffsets to small number (50 or 100)
4. Start topology, see to it that KafkaSpout polls more messages from Kafka that setMaxUncommittedOffsets
5. KafkaSpout stops polling messages from topic