Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
None
-
None
-
None
Description
lets say latest checkpoint in deltastreamer has 5 kafka partition offset. Users deleted and re-created and now there is only one kafka partition. So, when checking for new offsets, we run into NPE.
We might need to fix the parsing logic to accommodate only new partitions.
stacktrace:
23/07/06 16:07:52 ERROR DeltaStreamer : Failed to run job for table: ABC
java.lang.NullPointerException
at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.lambda$fetchValidOffsets$1(KafkaOffsetGen.java:404)
at java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90)
at java.util.HashMap$EntrySpliterator.tryAdvance(HashMap.java:1744)
at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
at java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:516)
at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.fetchValidOffsets(KafkaOffsetGen.java:404)
at org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.getNextOffsetRanges(KafkaOffsetGen.java:317)
at org.apache.hudi.utilities.sources.KafkaSource.fetchNewData(KafkaSource.java:67)
at org.apache.hudi.utilities.sources.Source.fetchNext(Source.java:105)
at org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:288)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:477)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:451)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:358)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.ingestOnce(HoodieDeltaStreamer.java:876)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at
Code snippet of interest
private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer, Option<String> lastCheckpointStr, Set<TopicPartition> topicPartitions) { Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions); Map<TopicPartition, Long> checkpointOffsets = CheckpointUtils.strToOffsets(lastCheckpointStr.get()); boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream() .anyMatch(offset -> offset.getValue() < earliestOffsets.get(offset.getKey()));
last time where we do earliestOffsets.get(offset.getKey()) runs into NPE.