Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-47146

Possible thread leak when doing sort merge join

    XMLWordPrintableJSON

Details

    Description

      I have a long-running spark job. stumbled upon executor taking up a lot of threads, resulting in no threads available on the server. Querying thread details via jstack, there are tons of threads named read-ahead. Checking the code confirms that these threads are created by ReadAheadInputStream. This class is initialized to create a single-threaded thread pool

      private final ExecutorService executorService =
          ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); 

      This thread pool is closed by ReadAheadInputStream#close(). 

      The call stack for the normal case close() method is

      ts=2024-02-21 17:36:18;thread_name=Executor task launch worker for task 60.0 in stage 71.0 (TID 258);id=330;is_daemon=true;priority=5;TCCL=org.apache.spark.util.MutableURLClassLoader@17233230
          @org.apache.spark.io.ReadAheadInputStream.close()
              at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.close(UnsafeSorterSpillReader.java:149)
              at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:121)
              at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$1.loadNext(UnsafeSorterSpillMerger.java:87)
              at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.advanceNext(UnsafeExternalRowSorter.java:187)
              at org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:67)
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage27.processNext(null:-1)
              at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
              at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.smj_findNextJoinRows_0$(null:-1)
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_1$(null:-1)
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.hashAgg_doAggregateWithKeys_0$(null:-1)
              at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage29.processNext(null:-1)
              at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
              at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:779)
              at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
              at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
              at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
              at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
              at org.apache.spark.scheduler.Task.run(Task.scala:139)
              at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
              at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.lang.Thread.run(Thread.java:829) 

      As shown in UnsafeSorterSpillReader#close, the stream is only closed when the data in the stream is read through.

      @Override
      public void loadNext() throws IOException {
        // Kill the task in case it has been marked as killed. This logic is from
        // InterruptibleIterator, but we inline it here instead of wrapping the iterator in order
        // to avoid performance overhead. This check is added here in `loadNext()` instead of in
        // `hasNext()` because it's technically possible for the caller to be relying on
        // `getNumRecords()` instead of `hasNext()` to know when to stop.
        if (taskContext != null) {
          taskContext.killTaskIfInterrupted();
        }
        recordLength = din.readInt();
        keyPrefix = din.readLong();
        if (recordLength > arr.length) {
          arr = new byte[recordLength];
          baseObject = arr;
        }
        ByteStreams.readFully(in, arr, 0, recordLength);
        numRecordsRemaining--;
        if (numRecordsRemaining == 0) {
          close();
        }
      } 

      In sort merge join+inner join, if any StreamSide or BufferSide iterator touches the end, the unread iterator at the other end will not continue to read. A similar situation exists for left and right outer joins.

      In short, in several specific sort merge join types, a memory leak can occur when the amount of data is so large that a spill is triggered and there are iterators that are not read through.

       

      Attachments

        Activity

          People

            JacobZheng JacobZheng
            JacobZheng JacobZheng
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: