Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23040

BlockStoreShuffleReader's return Iterator isn't interruptible if aggregator or ordering is specified

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Minor
    • Resolution: Fixed
    • 2.0.0, 2.0.1, 2.0.2, 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1
    • 2.3.1, 2.4.0
    • Spark Core
    • None

    Description

      For example, if ordering is specified, the returned iterator is an CompletionIterator

          dep.keyOrdering match {
            case Some(keyOrd: Ordering[K]) =>
              // Create an ExternalSorter to sort the data.
              val sorter =
                new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
              sorter.insertAll(aggregatedIter)
              context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
              context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
              context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
              CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
            case None =>
              aggregatedIter
          }
      

      However the sorter would consume(in sorter.insertAll) the aggregatedIter(which may be interruptible), then creates an iterator which isn't interruptible.

      The problem with this is that Spark task cannot be cancelled due to stage fail(without interruptThread enabled, which is disabled by default), which wasting executor resource.

      Attachments

        Activity

          People

            advancedxy YE
            advancedxy YE
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: