Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
4
Description
The offset computation fails with invalid offset range for the following edge case.
@Test
public void testBug() {
int[] partitions = new int[] {0, 1, 2, 3, 4};
long[] committedOffsets =
new long[] {76888767, 76725043, 76899767, 76833267, 76952055};
long[] latestOffsets =
new long[] {77005407, 76768151, 76985456, 76917973, 77080447};
OffsetRange[] ranges =
KafkaOffsetGen.CheckpointUtils.computeOffsetRanges(
makeOffsetMap(partitions, committedOffsets),
makeOffsetMap(partitions, latestOffsets),
400000,
20);
long totalNewMsgs = KafkaOffsetGen.CheckpointUtils.totalNewMessages(ranges);
assertEquals(400000, totalNewMsgs);
for (OffsetRange range : ranges) {
if (range.fromOffset() > range.untilOffset()) {
throw new IllegalArgumentException("Invalid offset range " + range);
}
}
}