Details
-
Improvement
-
Status: Resolved
-
Minor
-
Resolution: Fixed
-
2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1
-
None
Description
For example, if ordering is specified, the returned iterator is an CompletionIterator
dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. val sorter = new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) sorter.insertAll(aggregatedIter) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) case None => aggregatedIter }
However the sorter would consume(in sorter.insertAll) the aggregatedIter(which may be interruptible), then creates an iterator which isn't interruptible.
The problem with this is that Spark task cannot be cancelled due to stage fail(without interruptThread enabled, which is disabled by default), which wasting executor resource.