Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27526

Driver OOM error occurs while writing parquet file with Append mode

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.1
    • None
    • Input/Output, SQL
    • centos6.7

    Description

      As this user code below

      someDataFrame.write
      .mode(SaveMode.Append)
      .partitionBy(somePartitionKeySeqs)
      .parquet(targetPath);
      

      When spark try to write parquet files into hdfs with the SaveMode.Append mode,it must check the existing Partition Columns
      would match the "existed files" ,how ever,this behevior would cache all leaf fileInfos under the "targetPath";
      This can easily trigger oom when there are too many files in the targetPath;
      This behevior is useful when someone needs the exactly correctness ,but i think it should be optional to avoid the oom;

      The linked code be here

      //package org.apache.spark.sql.execution.datasources
      //case class DataSource
      
      private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
      ...
      /**
      */can we make it optional?
      */
      if (mode == SaveMode.Append) {
        val existingPartitionColumns = Try {
        /**
      * getOrInferFileFormatSchema(format, justPartitioning = true),
      * this method may cause oom when there be too many files,could we just sample limited files rather than all existed files ?
      */
          getOrInferFileFormatSchema(format, justPartitioning = true)
      ._2.fieldNames.toList
        }.getOrElse(Seq.empty[String])
      
        val sameColumns =
          existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
        if (existingPartitionColumns.nonEmpty && !sameColumns) {
          throw new AnalysisException(
            s"""Requested partitioning does not match existing partitioning.
               |Existing partitioning columns:
               |  ${existingPartitionColumns.mkString(", ")}
               |Requested partitioning columns:
               |  ${partitionColumns.mkString(", ")}
               |""".stripMargin)
        }
      }
      ...
      }
      
      
      private def getOrInferFileFormatSchema(
          format: FileFormat,
          justPartitioning: Boolean = false): (StructType, StructType) = {
        lazy val tempFileIndex = {
          val allPaths = caseInsensitiveOptions.get("path") ++ paths
          val hadoopConf = sparkSession.sessionState.newHadoopConf()
          val globbedPaths = allPaths.toSeq.flatMap { path =>
            val hdfsPath = new Path(path)
            val fs = hdfsPath.getFileSystem(hadoopConf)
            val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
            SparkHadoopUtil.get.globPathIfNecessary(qualified)
          }.toArray
         /**
          * InMemoryFileIndex.refresh0() cache all files info ,oom risks
         */ 
          new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
        }
      ...
      }
      

       

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            senyoung senyoung
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: