Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.4.0
-
None
Description
When writing to a hive table, the following temp directory is used:
/path/to/table/_temporary/0/
(the 0 at the end comes from the config
"mapreduce.job.application.attempt.id"
since that config is missing, it falls back to 0)
when there are 2 processes that write to the same table, there could be the following race condition:
- p1 creates temp folder and uses it
- p2 uses temp folder
- p1 finishes and deletes temp folder
- p2 fails since temp folder is missing
It is possible to recreate this error locally with the following code:
(the code runs locally, but I experienced the same error when running on a cluster
with 2 jobs writing to the same table)
import org.apache.spark.sql.functions._ val df = spark .range(1000) .toDF("a") .withColumn("partition", lit(0)) .cache() //create db sqlContext.sql("CREATE DATABASE IF NOT EXISTS db").count() //create table df .write .partitionBy("partition") .saveAsTable("db.table") val x = (1 to 100).par x.tasksupport = new ForkJoinTaskSupport( new ForkJoinPool(10)) //insert to different partitions in parallel x.foreach { p => val df2 = df .withColumn("partition",lit(p)) df2 .write .mode(SaveMode.Overwrite) .insertInto("db.table") }
the error would be:
java.io.FileNotFoundException: File file:/path/to/warehouse/db.db/table/_temporary/0 does not exist at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:406) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537) at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:669) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1497) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1537) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:283) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:325) at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:185) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:325) at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:311) at company.name.spark.hive.SparkHiveUtilsTest$$anonfun$3$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(SparkHiveUtilsTest.scala:190) at scala.collection.parallel.immutable.ParRange$ParRangeIterator.foreach(ParRange.scala:91) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
A possible work around that I found is to set the config "mapreduce.job.application.attempt.id"
to a random integer in every job in SparkConf, and thus making each job write to a different path, but that won't work when there is a single spark context (since the config is the same)