Description
When we wish to sort within partitions, we produce an UnsafeExternalRowSorter. This contains an UnsafeExternalSorter, which contains the UnsafeExternalRowComparator.
The UnsafeExternalSorter adds a task completion listener which performs any additional required cleanup. The upshot of this is that we maintain a reference to the UnsafeExternalRowSorter.RowComparator until the end of the task.
The RowComparator looks like
private static final class RowComparator extends RecordComparator { private final Ordering<InternalRow> ordering; private final int numFields; private final UnsafeRow row1; private final UnsafeRow row2; RowComparator(Ordering<InternalRow> ordering, int numFields) { this.numFields = numFields; this.row1 = new UnsafeRow(numFields); this.row2 = new UnsafeRow(numFields); this.ordering = ordering; } @Override public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) { // TODO: Why are the sizes -1? row1.pointTo(baseObj1, baseOff1, -1); row2.pointTo(baseObj2, baseOff2, -1); return ordering.compare(row1, row2); } }
which means that this will contain references to the last baseObjs that were passed in, and without tracking them for purposes of memory allocation.
We have a job which sorts within partitions and then coalesces partitions - this has a tendency to OOM because of the references to old UnsafeRows that were used during the sorting.
Attached is a screenshot of a memory dump during a task - our JVM has two executor threads.
It can be seen that we have 2 references inside of row iterators, and 11 more which are only known in the task completion listener or as part of memory management.