Uploaded image for project: 'Apache Hudi'
  1. Apache Hudi
  2. HUDI-6502

Fix NPE when there is mismatch in num of kafka partitions

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • None
    • None
    • deltastreamer
    • None

    Description

      lets say latest checkpoint in deltastreamer has 5 kafka partition offset. Users deleted and re-created and now there is only one kafka partition. So, when checking for new offsets, we run into NPE. 

      We might need to fix the parsing logic to accommodate only new partitions. 

      stacktrace:

      23/07/06 16:07:52 ERROR DeltaStreamer  : Failed to run job for table: ABC
      java.lang.NullPointerException
      	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:404)
      	at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
      	at java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1744)
      	at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
      	at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
      	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
      	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
      	at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
      	at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
      	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      	at java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:516)
      	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:404)
      	at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:317)
      	at org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:67)
      	at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:105)
      	at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:288)
      	at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:477)
      	at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:451)
      	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:358)
      	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.ingestOnce(HoodieDeltaStreamer.java:876)
      	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
      	at 

      Code snippet of interest 

      private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
                                                            Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) {
        Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions);
        Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get());
        boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
            .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey())); 

      last time where we do earliestOffsets.get(offset.getKey()) runs into NPE. 

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            shivnarayan sivabalan narayanan
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: