Details
-
Bug
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
-
None
Description
Description of Bug
CopyOnWriteInputFormat#getBlockIndexForPosition() requires BlockLocations to be sorted by offsets in ascending order.
However, the current comparator implementation does not guarantee that the BlockLocation array is sorted in an ascending order.
Stacktrace
Caused by: java.lang.IllegalArgumentException: The given offset is not contained in the any block. at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.getBlockIndexForPosition(CopyOnWriteInputFormat.java:374) at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:242) at org.apache.hudi.table.format.cow.CopyOnWriteInputFormat.createInputSplits(CopyOnWriteInputFormat.java:66) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:234) ... 21 more
Reproduction of issue
Current sorting implementation
Arrays.sort(blocks, new Comparator<BlockLocation>() { @Override public int compare(BlockLocation o1, BlockLocation o2) { long diff = o1.getLength() - o2.getOffset(); return Long.compare(diff, 0L); } });
Test
public class TestBlockLocationSort { static int compare(org.apache.hadoop.fs.BlockLocation o1, org.apache.hadoop.fs.BlockLocation o2) { long diff = o1.getLength() - o2.getOffset(); return Long.compare(diff, 0L); } @Test void testBlockLocationSort() { BlockLocation o1 = new BlockLocation(new String[0], new String[0], 0, 5); BlockLocation o2 = new BlockLocation(new String[0], new String[0], 5, 5); BlockLocation o3 = new BlockLocation(new String[0], new String[0], 6, 4); BlockLocation[] blocks1 = {o1, o2, o3}; System.out.println("BlockLocation[] bef. sort [pass 1]: " + Arrays.toString(blocks1)); Arrays.sort(blocks1, TestBlockLocationSort::compare); System.out.println("BlockLocation[] aft. sort [pass 1]: " + Arrays.toString(blocks1) + "\n"); System.out.println("BlockLocation[] bef. sort [pass 2]: " + Arrays.toString(blocks1)); Arrays.sort(blocks1, TestBlockLocationSort::compare); System.out.println("BlockLocation[] aft. sort [pass 2]: " + Arrays.toString(blocks1) + "\n"); } }
Output
BlockLocation[] bef. sort [pass 1]: [0,5, 5,5, 6,4] BlockLocation[] aft. sort [pass 1]: [0,5, 6,4, 5,5] BlockLocation[] bef. sort [pass 2]: [0,5, 6,4, 5,5] BlockLocation[] aft. sort [pass 2]: [0,5, 5,5, 6,4]
As can be seen, the current BlockLocation sorting is not idempotent.
Sorting should be idempotent - Sorting a collection the first time will put it in order, running a sort operation on the same array again should have no impact on the array that is already sorted.
Attachments
Issue Links
- links to