Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.x
-
None
Description
Kafka maintains the spout progress (offsets for partitions) which can hold a value which no longer exists (or offset+1 doesn't exist) in the topic due to following reasons
- Topology stopped processing (or died) & topic got compacted (cleanup.policy=compact) leaving offset voids in the topic.
- Topology stopped processing (or died) & Topic got cleaned up (cleanup.policy=delete) and the offset.
When the topology starts processing again (or restarted), the spout logic suggests that the next offset has to be (committedOffset+1) for the spout to make progress, which will never be the case as (committedOffset+1) has been removed from the topic and will never be acked.
if (currOffset == nextCommitOffset + 1) { // found the next offset to commit found = true; nextCommitMsg = currAckedMsg; nextCommitOffset = currOffset; } else if (currOffset > nextCommitOffset + 1) { LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset); }
A smart forwarding mechanism has to be built so as to forward the spout pivot to the next logical location, instead of a hardcoded single forward operation.
Attachments
Issue Links
- relates to
-
STORM-2639 Kafka Spout incorrectly computes numCommittedOffsets due to voids in the topic (topic compaction)
- Resolved
- links to