Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-2352

New Kafka spout retries for ever even with retries of 5

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 1.0.0, 1.1.0
    • None
    • storm-kafka-client
    • None

    Description

      v1.0.0 and above

      KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still the KafkaSpout retries the failed Tuple forever.

      Reason:
      The numFails are incremented in fail() method of KafkaSpout.

      public void fail(Object messageId) {
              final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
              emitted.remove(msgId);
              if (msgId.numFails() < maxRetries) {
                  msgId.incrementNumFails();
                  retryService.schedule(msgId);
              } else { // limit to max number of retries
                  LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
                  ack(msgId);
              }
          }
      

      However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId and checks if the msgId is ready to be emitted (in the case of failure) and if so emits the new msgId instance (thus losing the numFails from the previous time)

          private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
              final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
              final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
      
              if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has been acked
                  LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
              } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
                  LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
              } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
                  final List<Object> tuple = tuplesBuilder.buildTuple(record);
                  kafkaSpoutStreams.emit(collector, tuple, msgId);
                  emitted.add(msgId);
                  numUncommittedOffsets++;
                  if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
                      retryService.remove(msgId);  // re-emitted hence remove from failed
                  }
                  LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
              }
          }
      

      isReady() is not a side-effect. It just looks up and returns true. Fix is to either modify the RetryService interface to convey back the msgId in the RetryService or make the isReady() a side-effect to attach the numFails from the previous time OR to add 'failed' to KafkaSpout to keep track of failed msgs (similar to acked) and use the msgId from the failed to emit if isReady() is true

      Attachments

        1. KafkaSpoutTest.java
          11 kB
          Kishore Senji

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ksenji Kishore Senji
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: