Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-44003

DynamicPartitionDataSingleWriter is being starved by Parquet MemoryManager

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.3.1
    • None
    • Spark Core
    • None

    Description

      We have a pyspark job that writes to a partitioned parquet dataset via:

      df.write.parquet(
        path=path,
        compression="snappy",
        mode="overwrite",
        partitionBy="year",
      )
      

      In this specific production case we partition by 28 distinct years, so 28 directories, each directory with 200 part files, total of 5.6K files. This particular job runs on a single dedicated and ephemeral VM. We have noticed that most of the time the VM is far from being saturated and the job is very slow. It's not IO or CPU bound. Here's an annotated VM utilization graph . The blue line is CPU, and turquoise is memory. This graph doesn't show IO, but we have also monitored that, and it also was not saturated. On the labels:

      • BQ, you can ignore this
      • SPARK~1 spark computes some data
      • SPARK~2 is 1st slow period
      • SPARK~3 is 2nd slow period

      We took two 10 minute JFR profiles, those are marked P-1 and P-2 in the graph above. So P-1 is solely in SPARK~2, and P-2 is partially in SPARK~2 but mostly in SPARK~3. Here's the P-1 profile, and here's P-2 profile.

      The picture is a bit more clear when we look at the locks, here's the report. We see that the threads were blocked on locks for a total of 20.5h, mostly/specifically on the global org.apache.parquet.hadoop.MemoryManager, which has two synchronized methods: addWriter and removeWriter. From parquet-mr GH src:

        /**
         * Add a new writer and its memory allocation to the memory manager.
         * @param writer the new created writer
         * @param allocation the requested buffer size
         */
        synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
          Long oldValue = writerList.get(writer);
          if (oldValue == null) {
            writerList.put(writer, allocation);
          } else {
            throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
                "instance of InternalParquetRecordWriter more than once. The Manager already contains " +
                "the writer: " + writer);
          }
          updateAllocation();
        }
      
        /**
         * Remove the given writer from the memory manager.
         * @param writer the writer that has been closed
         */
        synchronized void removeWriter(InternalParquetRecordWriter<?> writer) {
          writerList.remove(writer);
          if (!writerList.isEmpty()) {
            updateAllocation();
          }
        }
      

      During the 10 minute profiling session all worker threads were mostly waiting on this lock. The implementation of the writer comes from Spark src:

      /**
       * Dynamic partition writer with single writer, meaning only one writer is opened at any time for
       * writing. The records to be written are required to be sorted on partition and/or bucket
       * column(s) before writing.
       */
      class DynamicPartitionDataSingleWriter(
      

      It appears that a combination of large number of writers created via Spark's DynamicPartitionDataSingleWriter and the MemoryManager synchronization bottleneck drastically reduces the performance by starving the writer threads. We have validated that by removing the partitioning the job is much faster and fully utilizes the VM.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ravwojdyla Rafal Wojdyla
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated: