Uploaded image for project: 'Apache Storm'
  1. Apache Storm
  2. STORM-3145

UNCOMMITTED_EARLIEST and UNCOMMITTED_LATEST doesn't work as expected

    XMLWordPrintableJSON

Details

    Description

      I am using storm-kafka-client 1.2.2.

      When I ran my topology I got NPE as mentioned in https://issues.apache.org/jira/browse/STORM-3046.

      So I modified the KafkaTridentSpoutEmitter.java#seek with patch mentioned in the Jira STORM-3046. Below is the modified code.

      private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) {
      
      if (isFirstPoll(tp, transactionId)) {
      
      if (firstPollOffsetStrategy == EARLIEST) {
      
      LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
      
      kafkaConsumer.seekToBeginning(Collections.singleton(tp));
      
      } else if (firstPollOffsetStrategy == LATEST) {
      
      LOG.debug("First poll for topic partition [{}], seeking to partition end", tp);
      
      kafkaConsumer.seekToEnd(Collections.singleton(tp));
      
      } else if (lastBatchMeta != null) {
      
      LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
      
      kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
      
      } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
      
      LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp);
      
      kafkaConsumer.seekToBeginning(Collections.singleton(tp));
      
      } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
      
      LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp);
      
      kafkaConsumer.seekToEnd(Collections.singleton(tp));
      
      }
      
      firstPollTransaction.put(tp, transactionId);
      
      } else if (lastBatchMeta != null) {
              kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
              LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
      } else {
              long initialFetchOffset = firstPollTransaction.get(tp);
              OffsetAndMetadata lastCommittedOffset = kafkaConsumer.committed(tp);
              kafkaConsumer.seek(tp, lastCommittedOffset.offset());
              LOG.debug("First poll for topic partition [{}], no last batch metadata present."
                        + " Using stored initial fetch offset [{}]", tp, initialFetchOffset);
      }
      
          final long fetchOffset = kafkaConsumer.position(tp);
          LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp);
          return fetchOffset;
      }

      Now after code change - when the offset strategy is UNCOMMITTED_EARLIEST for the very first time when it is the first poll for the spout, lastBatchMeta is null and the kafka consumer seeks to the beginning offset. Should I be checking for last commit and starting from there? Likewise for the next fetch (when lastBatchMeta is not null), kafka consumer seeks to lastBatchMeta.getLastOffset() + 1. Should I be doing same here, checking for last commit and starting from there?

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            prakashnm Prakash N M
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: