Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27030

DataFrameWriter.insertInto fails when writing in parallel to a hive table

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.4.0
    • None
    • SQL

    Description

      When writing to a hive table, the following temp directory is used:

      /path/to/table/_temporary/0/

      (the 0 at the end comes from the config

      "mapreduce.job.application.attempt.id"

      since that config is missing, it falls back to 0)

      when there are 2 processes that write to the same table, there could be the following race condition:

      1. p1 creates temp folder and uses it
      2. p2 uses temp folder
      3. p1 finishes and deletes temp folder
      4. p2 fails since temp folder is missing

       

      It is possible to recreate this error locally with the following code:
      (the code runs locally, but I experienced the same error when running on a cluster
      with 2 jobs writing to the same table)

      import org.apache.spark.sql.functions._
      val df = spark
       .range(1000)
       .toDF("a")
       .withColumn("partition", lit(0))
       .cache()
      //create db
      sqlContext.sql("CREATE DATABASE IF NOT EXISTS db").count()
      
      //create table
      df
       .write
       .partitionBy("partition")
       .saveAsTable("db.table")
      val x = (1 to 100).par
      x.tasksupport = new ForkJoinTaskSupport( new ForkJoinPool(10))
      
      
      //insert to different partitions in parallel
      x.foreach { p =>
      
       val df2 = df
       .withColumn("partition",lit(p))
        df2
         .write
         .mode(SaveMode.Overwrite)
         .insertInto("db.table")
      }
      

       

       the error would be:

      java.io.FileNotFoundException: File file:/path/to/warehouse/db.db/table/_temporary/0 does not exist
       at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:406)
       at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
       at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
       at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:669)
       at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497)
       at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537)
       at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:283)
       at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:325)
       at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
       at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166)
       at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:185)
       at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
       at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
       at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
       at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
       at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
       at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
       at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
       at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
       at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
       at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
       at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
       at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
       at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
       at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
       at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
       at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
       at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
       at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:325)
       at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:311)
       at company.name.spark.hive.SparkHiveUtilsTest$$anonfun$3$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(SparkHiveUtilsTest.scala:190)
       at scala.collection.parallel.immutable.ParRange$ParRangeIterator.foreach(ParRange.scala:91)
       at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
       at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
       at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
       at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
       at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
       at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
       at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
       at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
       at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
       at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
       at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
       at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
       at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
       at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

       

      A possible work around that I found is to set the config "mapreduce.job.application.attempt.id"
      to a random integer in every job in SparkConf, and thus making each job write to a different path, but that won't work when there is a single spark context (since the config is the same)

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            lev Lev Katzav
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: