Uploaded image for project: 'Parquet'
  1. Parquet
  2. PARQUET-2412

Excessive synchronization in MemoryManager

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.14.0
    • None
    • parquet-hadoop
    • None

    Description

      Issue originally reported in Spark: https://issues.apache.org/jira/browse/SPARK-44003

      We have a pyspark job that writes to a partitioned parquet dataset via:

      df.write.parquet(
        path=path,
        compression="snappy",
        mode="overwrite",
        partitionBy="year",
      )
      

      In this specific production case we partition by 28 distinct years, so 28 directories, each directory with 200 part files, total of 5.6K files. This particular job runs on a single dedicated and ephemeral VM. We have noticed that most of the time the VM is far from being saturated and the job is very slow. It's not IO or CPU bound. Here's an annotated VM utilization graph . The blue line is CPU, and turquoise is memory. This graph doesn't show IO, but we have also monitored that, and it also was not saturated. On the labels:

      • BQ, you can ignore this
      • SPARK~1 spark computes some data
      • SPARK~2 is 1st slow period
      • SPARK~3 is 2nd slow period

      We took two 10 minute JFR profiles, those are marked P-1 and P-2 in the graph above. So P-1 is solely in SPARK~2, and P-2 is partially in SPARK~2 but mostly in SPARK~3. Here's the P-1 profile, and here's P-2 profile.

      The picture is a bit more clear when we look at the locks, here's the report. We see that the threads were blocked on locks for a total of 20.5h, mostly/specifically on the global org.apache.parquet.hadoop.MemoryManager, which has two synchronized methods: addWriter and removeWriter. From parquet-mr GH src:

        /**
         * Add a new writer and its memory allocation to the memory manager.
         * @param writer the new created writer
         * @param allocation the requested buffer size
         */
        synchronized void addWriter(InternalParquetRecordWriter<?> writer, Long allocation) {
          Long oldValue = writerList.get(writer);
          if (oldValue == null) {
            writerList.put(writer, allocation);
          } else {
            throw new IllegalArgumentException("[BUG] The Parquet Memory Manager should not add an " +
                "instance of InternalParquetRecordWriter more than once. The Manager already contains " +
                "the writer: " + writer);
          }
          updateAllocation();
        }
      
        /**
         * Remove the given writer from the memory manager.
         * @param writer the writer that has been closed
         */
        synchronized void removeWriter(InternalParquetRecordWriter<?> writer) {
          writerList.remove(writer);
          if (!writerList.isEmpty()) {
            updateAllocation();
          }
        }
      

      During the 10 minute profiling session all worker threads were mostly waiting on this lock.

      It appears that a combination of large number of writers created via Spark's DynamicPartitionDataSingleWriter and the MemoryManager synchronization bottleneck drastically reduces the performance by starving the writer threads.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              ravwojdyla Rafal Wojdyla
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated: