Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2229

KafkaSpout does not resend failed tuples

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • 1.0.0, 1.0.1, 1.0.2
    • None
    • storm-kafka-client
    • None

    Description

      When the topology fails a tuple, it is never resent by the KafkaSpout. This can easily be shown by constructing a small topology failing every tuple.

      Apparent reason:

      public class KafkaSpout<K, V> extends BaseRichSpout {
      //...
      private void doSeekRetriableTopicPartitions() {
              final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
      
              for (TopicPartition rtp : retriableTopicPartitions) {
                  final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
                  if (offsetAndMeta != null) {
                      kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
                  } else {
                      kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last committed offset <== Does seek to end of partition
                  }
              }
          }
      

      The code seeks to the end of the partition instead of seeking to the first uncommited offset.

      Preliminary fix (worked for me, but needs to be checked by an expert)

          private void doSeekRetriableTopicPartitions() {
              final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
      
              for (TopicPartition rtp : retriableTopicPartitions) {
                  final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
                  if (offsetAndMeta != null) {
                      kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
                  } else {
                      OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
                      if(committed == null) {
                          // No offsets commited yet for this partition - start from beginning 
                          kafkaConsumer.seekToBeginning(toArrayList(rtp));
                      } else {
                         // Seek to first uncommitted offset
                          kafkaConsumer.seek(rtp, committed.offset() + 1);
                      }
                  }
              }
          }
      

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              db3f Matthias Klein
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: