Details
-
Bug
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
None
-
None
-
None
Description
when previous offset is out of range, offsets.startAt(previousOffset) will throw StartOffsetOutOfRangeException, thus the startOffset in offsets won't be updated.
So we could not use offsets.getStartOffset instead of previous offset in the following logic.
try { offsets.startAt(previousOffset); } catch (StartOffsetOutOfRangeException e) { // Increment counts, which will be reported as job metrics if (offsets.getStartOffset() <= offsets.getLatestOffset()) { this.offsetTooEarlyCount.incrementAndGet(); } else { this.offsetTooLateCount.incrementAndGet(); } // When previous offset is out of range, either start at earliest, latest or nearest offset, or skip the // partition. If skipping, need to create an empty workunit so that previousOffset is persisted. String offsetOutOfRangeMsg = String.format( "Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.", partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset()); String offsetOption = state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase(); if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET) && offsets.getStartOffset() >= offsets.getLatestOffset())) { LOG.warn( offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset()); offsets.startAtLatestOffset(); } else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) { LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset()); offsets.startAtEarliestOffset(); } else { LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped."); return createEmptyWorkUnit(partition, previousOffset, topicSpecificState); } }