Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2229

KafkaSpout does not resend failed tuples

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.0.0, 1.0.1, 1.0.2
    • Fix Version/s: None
    • Component/s: storm-kafka-client
    • Labels:
      None

      Description

      When the topology fails a tuple, it is never resent by the KafkaSpout. This can easily be shown by constructing a small topology failing every tuple.

      Apparent reason:

      public class KafkaSpout<K, V> extends BaseRichSpout {
      //...
      private void doSeekRetriableTopicPartitions() {
              final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
      
              for (TopicPartition rtp : retriableTopicPartitions) {
                  final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
                  if (offsetAndMeta != null) {
                      kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
                  } else {
                      kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last committed offset <== Does seek to end of partition
                  }
              }
          }
      

      The code seeks to the end of the partition instead of seeking to the first uncommited offset.

      Preliminary fix (worked for me, but needs to be checked by an expert)

          private void doSeekRetriableTopicPartitions() {
              final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
      
              for (TopicPartition rtp : retriableTopicPartitions) {
                  final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
                  if (offsetAndMeta != null) {
                      kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
                  } else {
                      OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
                      if(committed == null) {
                          // No offsets commited yet for this partition - start from beginning 
                          kafkaConsumer.seekToBeginning(toArrayList(rtp));
                      } else {
                         // Seek to first uncommitted offset
                          kafkaConsumer.seek(rtp, committed.offset() + 1);
                      }
                  }
              }
          }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                db3f Matthias Klein
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: