Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Duplicate
-
1.0.0, 1.0.1, 1.0.2
-
None
-
None
Description
When the topology fails a tuple, it is never resent by the KafkaSpout. This can easily be shown by constructing a small topology failing every tuple.
Apparent reason:
public class KafkaSpout<K, V> extends BaseRichSpout { //... private void doSeekRetriableTopicPartitions() { final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions(); for (TopicPartition rtp : retriableTopicPartitions) { final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset <== Does seek to end of partition } } }
The code seeks to the end of the partition instead of seeking to the first uncommited offset.
Preliminary fix (worked for me, but needs to be checked by an expert)
private void doSeekRetriableTopicPartitions() { final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions(); for (TopicPartition rtp : retriableTopicPartitions) { final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset(); if (offsetAndMeta != null) { kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle } else { OffsetAndMetadata committed = kafkaConsumer.committed(rtp); if(committed == null) { // No offsets commited yet for this partition - start from beginning kafkaConsumer.seekToBeginning(toArrayList(rtp)); } else { // Seek to first uncommitted offset kafkaConsumer.seek(rtp, committed.offset() + 1); } } } }
Attachments
Issue Links
- duplicates
-
STORM-2087 Storm-kafka-client: Failed tuples are not always replayed
- Resolved
- is duplicated by
-
STORM-2077 KafkaSpout doesn't retry failed tuples
- Closed