Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-18512

FileNotFoundException on _temporary directory with Spark Streaming 2.0.1 and S3A

    Details

    • Type: Bug
    • Status: Open
    • Priority: Major
    • Resolution: Unresolved
    • Affects Version/s: 2.0.1
    • Fix Version/s: None
    • Component/s: Structured Streaming
    • Labels:
      None
    • Environment:

      AWS EMR 5.0.1
      Spark 2.0.1
      S3 EU-West-1 (S3A)

      Description

      After a few hours of streaming processing and data saving in Parquet format, I got always this exception:

      java.io.FileNotFoundException: No such file or directory: s3a://xxx/_temporary/0/task_xxxx
      	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1004)
      	at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:745)
      	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:426)
      	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:362)
      	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
      	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
      	at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222)
      	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
      	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
      	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
      	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
      	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
      	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60)
      	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58)
      	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
      	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
      	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
      	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
      	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
      	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
      	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:510)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
      	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:488)
      

      I've tried also s3:// and s3n:// but it always happens after a 3-5 hours.

        Issue Links

          Activity

          Hide
          zsxwing Shixiong Zhu added a comment -

          Did you enable speculation?

          Show
          zsxwing Shixiong Zhu added a comment - Did you enable speculation?
          Hide
          giuseppe.bonaccorso Giuseppe Bonaccorso added a comment -

          No, I didn't. I'm afraid that relaunching a streaming task can cause a loss of data.

          Show
          giuseppe.bonaccorso Giuseppe Bonaccorso added a comment - No, I didn't. I'm afraid that relaunching a streaming task can cause a loss of data.
          Hide
          uncleGen Genmao Yu added a comment -

          Giuseppe Bonaccorso How can i reproduce this failure? Can u provide some code snippet?

          Show
          uncleGen Genmao Yu added a comment - Giuseppe Bonaccorso How can i reproduce this failure? Can u provide some code snippet?
          Hide
          stevel@apache.org Steve Loughran added a comment -

          This looks like a consistency problem; s3 listing always lags the creation/deletion/update of contents.

          the committer has listed paths to merge in, then gone through each one to see their type: if not a file, lists the subdirectory, and, interestingly gets an exception at this point. Maybe the first listing found an object which is no longer there by the time the second listing went through, that is, the exception isn't a delay-on-create, its a delay-on-delete.

          create listing delays could be handled in the committer by having a retry on an FNFE; it'd slighly increase the time before a failure, but as that's a failure path, not too serious; delete delays could be addressed the opposite: ignore the problem, on the basis that if the listing failed, there's no file to rename. That's more worrying as it's a sign of a problem which could have implications further up the commit process: things are changing in the listing of files being renamed.

          HADOOP-13345 is going to address list inconsistency; I'm doing a committer there which I could also try to make more robust even when not using a dynamo-DB backed bucket. Question is: what is the good retry policy here, especially given once an inconsistency has surfaced, a large amount of the merge may already have taken place. Backing up and retrying may be differently dangerous.

          One thing I would recommend trying is: commit to HDFS, then copy. Do that and you can turn speculation on in your executors, get the local Virtual HDD perf and networking, as well as a consistent view. copy to s3a after all that you want done is complete.

          Show
          stevel@apache.org Steve Loughran added a comment - This looks like a consistency problem; s3 listing always lags the creation/deletion/update of contents. the committer has listed paths to merge in, then gone through each one to see their type: if not a file, lists the subdirectory, and, interestingly gets an exception at this point. Maybe the first listing found an object which is no longer there by the time the second listing went through, that is, the exception isn't a delay-on-create, its a delay-on-delete. create listing delays could be handled in the committer by having a retry on an FNFE; it'd slighly increase the time before a failure, but as that's a failure path, not too serious; delete delays could be addressed the opposite: ignore the problem, on the basis that if the listing failed, there's no file to rename. That's more worrying as it's a sign of a problem which could have implications further up the commit process: things are changing in the listing of files being renamed. HADOOP-13345 is going to address list inconsistency; I'm doing a committer there which I could also try to make more robust even when not using a dynamo-DB backed bucket. Question is: what is the good retry policy here, especially given once an inconsistency has surfaced, a large amount of the merge may already have taken place. Backing up and retrying may be differently dangerous. One thing I would recommend trying is: commit to HDFS, then copy. Do that and you can turn speculation on in your executors, get the local Virtual HDD perf and networking, as well as a consistent view. copy to s3a after all that you want done is complete.
          Hide
          stevel@apache.org Steve Loughran added a comment - - edited

          one question: what's the size of data being committed here?

          And a suggestion: use algorithm 2 for committing files if you aren't already; I think from the stack trace you may be using the v1 algorithm, which has more renames than it needs.

          Here are the options I have for reading ORC and Parquet, and for output work

            private val ORC_OPTIONS = Map(
              "spark.hadoop.orc.splits.include.file.footer" -> "true",
              "spark.hadoop.orc.cache.stripe.details.size" -> "1000",
              "spark.hadoop.orc.filterPushdown" -> "true")
          
            private val PARQUET_OPTIONS = Map(
              "spark.sql.parquet.mergeSchema" -> "false",
              "spark.sql.parquet.filterPushdown" -> "true")
          
            private val MAPREDUCE_OPTIONS = Map(
              "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version" -> "2",
              "spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored" -> "true" )
          
          Show
          stevel@apache.org Steve Loughran added a comment - - edited one question: what's the size of data being committed here? And a suggestion: use algorithm 2 for committing files if you aren't already; I think from the stack trace you may be using the v1 algorithm, which has more renames than it needs. Here are the options I have for reading ORC and Parquet, and for output work private val ORC_OPTIONS = Map( "spark.hadoop.orc.splits.include.file.footer" -> " true " , "spark.hadoop.orc.cache.stripe.details.size" -> "1000" , "spark.hadoop.orc.filterPushdown" -> " true " ) private val PARQUET_OPTIONS = Map( "spark.sql.parquet.mergeSchema" -> " false " , "spark.sql.parquet.filterPushdown" -> " true " ) private val MAPREDUCE_OPTIONS = Map( "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version" -> "2" , "spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored" -> " true " )
          Hide
          giuseppe.bonaccorso Giuseppe Bonaccorso added a comment -

          It happens after about 1.000.000 writings of (100 - 1000 KB).
          I've tried your solution writing to HDFS and it works. Unfortunately I cannot use distcp on the same cluster, because we're working with Spark Streaming. Is it possible doing it on the same cluster without distcp? Thanks

          Show
          giuseppe.bonaccorso Giuseppe Bonaccorso added a comment - It happens after about 1.000.000 writings of (100 - 1000 KB). I've tried your solution writing to HDFS and it works. Unfortunately I cannot use distcp on the same cluster, because we're working with Spark Streaming. Is it possible doing it on the same cluster without distcp? Thanks
          Hide
          giuseppe.bonaccorso Giuseppe Bonaccorso added a comment -

          Thanks! I've already set mergeSchema=false, but I gonna add filterPushdown and mapreduce options.

          Show
          giuseppe.bonaccorso Giuseppe Bonaccorso added a comment - Thanks! I've already set mergeSchema=false, but I gonna add filterPushdown and mapreduce options.
          Hide
          stevel@apache.org Steve Loughran added a comment -

          no. What you are seeing is an eventual consistency surfacing; the listing showing files which aren't there, and a committer which expects a consistency between the file listed and the files existing. HADOOP-13786 covers the development of an output committer explicitly for s3 + dynamo; it's the latter which offers the consistency. I could try and make it more robust against a file in the listing not actually being present though: downgrade from an error to warning ... though there's a risk that this would hide actual data loss

          Show
          stevel@apache.org Steve Loughran added a comment - no. What you are seeing is an eventual consistency surfacing; the listing showing files which aren't there, and a committer which expects a consistency between the file listed and the files existing. HADOOP-13786 covers the development of an output committer explicitly for s3 + dynamo; it's the latter which offers the consistency. I could try and make it more robust against a file in the listing not actually being present though: downgrade from an error to warning ... though there's a risk that this would hide actual data loss
          Hide
          stevel@apache.org Steve Loughran added a comment -

          Actually, this is the problem whcih MAPREDUCE-6478 deals with surfacing; that cleanup-failures.ignored option. However, that's not shipping in Hadoop 2.7.x; it will be out in Hadoop 2.8. I do not know what is in AWS EMR, or its roadmap, so cannot comment about whether setting the flag currently does anything. Sorry

          Show
          stevel@apache.org Steve Loughran added a comment - Actually, this is the problem whcih MAPREDUCE-6478 deals with surfacing; that cleanup-failures.ignored option. However, that's not shipping in Hadoop 2.7.x; it will be out in Hadoop 2.8. I do not know what is in AWS EMR, or its roadmap, so cannot comment about whether setting the flag currently does anything. Sorry
          Hide
          stevel@apache.org Steve Loughran added a comment -

          of course, if you do switch to EMRFS, you should get that consistent view...can you try that and update the JIRA on whether that makes it go away.

          Show
          stevel@apache.org Steve Loughran added a comment - of course, if you do switch to EMRFS, you should get that consistent view...can you try that and update the JIRA on whether that makes it go away.
          Hide
          abridgett Adrian Bridgett added a comment -

          So currently, with Spark2 there's no sensible way to write to S3? (Think of this as a question not a rant!) That is no way to avoid either S3 rename latency problems or this issue, unless you use EMRFS or e.g. write to HDFS first and distcp the files over?

          I wonder if a backport of MAPREDUCE-6478 to hadoop-2.7.x is on the cards (hadoop-2.8.x is presumably a while away from production readiness).

          Show
          abridgett Adrian Bridgett added a comment - So currently, with Spark2 there's no sensible way to write to S3? (Think of this as a question not a rant!) That is no way to avoid either S3 rename latency problems or this issue, unless you use EMRFS or e.g. write to HDFS first and distcp the files over? I wonder if a backport of MAPREDUCE-6478 to hadoop-2.7.x is on the cards (hadoop-2.8.x is presumably a while away from production readiness).
          Hide
          stevel@apache.org Steve Loughran added a comment -

          ah, the "when will 2.8 ship" question. Really close, Jun Ping owns the problem, and the s3a performance work has been helping slow it down (or at least meaning I've not been pushing for a release).

          I am doing a (better, faster) comitter which will be deterministic with dynamodb backing. Without that, writing to any eventually consistent store is danger, because spark makes some fundamental assumptions about filesystems "if a file says it is there in a listing, and of a given size", then you can open it and read in that much data. Similarly, there's the assumption that after you delete something, they aren't visible any more.

          Anyway, you could ask on the MR JIRA about getting that backported to Hadoop 2.7.x; I don't see any fundamental reason to veto it: no risk of breaking things if the option isn't set

          Show
          stevel@apache.org Steve Loughran added a comment - ah, the "when will 2.8 ship" question. Really close, Jun Ping owns the problem, and the s3a performance work has been helping slow it down (or at least meaning I've not been pushing for a release). I am doing a (better, faster) comitter which will be deterministic with dynamodb backing. Without that, writing to any eventually consistent store is danger, because spark makes some fundamental assumptions about filesystems "if a file says it is there in a listing, and of a given size", then you can open it and read in that much data. Similarly, there's the assumption that after you delete something, they aren't visible any more. Anyway, you could ask on the MR JIRA about getting that backported to Hadoop 2.7.x; I don't see any fundamental reason to veto it: no risk of breaking things if the option isn't set
          Hide
          abridgett Adrian Bridgett added a comment -

          Thanks Steve - just seen a "next few weeks" mentioned in hadoop-common-dev TBH I can't quite see how the cleanup-failures.ignored patch is going to help in the mergePaths case - so I'm looking at just re-running our jobs if they hit problems (and getting some idea of how often that's going to be). S3 is a big cost saver for us vs running HDFS

          Show
          abridgett Adrian Bridgett added a comment - Thanks Steve - just seen a "next few weeks" mentioned in hadoop-common-dev TBH I can't quite see how the cleanup-failures.ignored patch is going to help in the mergePaths case - so I'm looking at just re-running our jobs if they hit problems (and getting some idea of how often that's going to be). S3 is a big cost saver for us vs running HDFS
          Hide
          stevel@apache.org Steve Loughran added a comment -

          It'd be good to get some more details from people who see this, especially if they have the commit algorithm = 2, which is the one with reduced renames. That is: collect your stack traces and see what specific conditions trigger it. If it's that the directory "temporary" has been deleted and yet it still appeared in the listing for the merge, well, maybe we should have special handling for the case that it has vanished. For example, in the committer, build a map of which directories have been deleted by that instance, hence, which it doesn't have to worry about.

          (that's not necessarily going to work, as the committers run across the cluster; it's why dynamodb is needed)

          Show
          stevel@apache.org Steve Loughran added a comment - It'd be good to get some more details from people who see this, especially if they have the commit algorithm = 2, which is the one with reduced renames. That is: collect your stack traces and see what specific conditions trigger it. If it's that the directory "temporary" has been deleted and yet it still appeared in the listing for the merge, well, maybe we should have special handling for the case that it has vanished. For example, in the committer, build a map of which directories have been deleted by that instance, hence, which it doesn't have to worry about. (that's not necessarily going to work, as the committers run across the cluster; it's why dynamodb is needed)
          Hide
          mathieude Mathieu DESPRIEE added a comment - - edited

          I'm experiencing a very similar problem, but we're using HDFS, not S3, and it's not a streaming app.
          As Giuseppe, this usually happens after some time with heavy load.

          16/12/15 11:25:18 ERROR InsertIntoHadoopFsRelationCommand: Aborting job.
          java.io.FileNotFoundException: File hdfs://nameservice1/user/xdstore/rfs/rfsDB/_temporary/0 does not exist.
                  at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:795)
                  at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106)
                  at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:853)
                  at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:849)
                  at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
                  at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:860)
                  at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517)
                  at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557)
                  at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:291)
                  at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:361)
                  at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334)
                  at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46)
                  at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222)
                  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144)
                  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
                  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115)
                  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
                  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115)
                  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
                  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
                  at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
                  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
                  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
                  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
                  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
                  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
                  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
                  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86)
                  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86)
                  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:525)
                  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
                  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
                  at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:488)
                  at com.bluedme.woda.ng.indexer.RfsRepository.append(RfsRepository.scala:36)
                  at com.bluedme.woda.ng.indexer.RfsRepository.insert(RfsRepository.scala:23)
                  at com.bluedme.woda.cmd.ShareDatasetImpl.runImmediate(ShareDatasetImpl.scala:33)
                  at com.bluedme.woda.cmd.ShareDatasetImpl.runImmediate(ShareDatasetImpl.scala:13)
                  at com.bluedme.woda.cmd.ImmediateCommandImpl$$anonfun$run$1.apply(CommandImpl.scala:21)
                  at com.bluedme.woda.cmd.ImmediateCommandImpl$$anonfun$run$1.apply(CommandImpl.scala:21)
                  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
                  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
                  at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
                  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
                  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
                  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
                  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
          

          We're on a CDH 5.7, Hadoop 2.6.

          Should I file another JIRA ?

          Show
          mathieude Mathieu DESPRIEE added a comment - - edited I'm experiencing a very similar problem, but we're using HDFS, not S3, and it's not a streaming app. As Giuseppe, this usually happens after some time with heavy load. 16/12/15 11:25:18 ERROR InsertIntoHadoopFsRelationCommand: Aborting job. java.io.FileNotFoundException: File hdfs: //nameservice1/user/xdstore/rfs/rfsDB/_temporary/0 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:795) at org.apache.hadoop.hdfs.DistributedFileSystem.access$700(DistributedFileSystem.java:106) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:853) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:849) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:860) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1517) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1557) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:291) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:361) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:334) at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:46) at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:222) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:144) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply(InsertIntoHadoopFsRelationCommand.scala:115) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:115) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:525) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:488) at com.bluedme.woda.ng.indexer.RfsRepository.append(RfsRepository.scala:36) at com.bluedme.woda.ng.indexer.RfsRepository.insert(RfsRepository.scala:23) at com.bluedme.woda.cmd.ShareDatasetImpl.runImmediate(ShareDatasetImpl.scala:33) at com.bluedme.woda.cmd.ShareDatasetImpl.runImmediate(ShareDatasetImpl.scala:13) at com.bluedme.woda.cmd.ImmediateCommandImpl$$anonfun$run$1.apply(CommandImpl.scala:21) at com.bluedme.woda.cmd.ImmediateCommandImpl$$anonfun$run$1.apply(CommandImpl.scala:21) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) We're on a CDH 5.7, Hadoop 2.6. Should I file another JIRA ?
          Hide
          stevel@apache.org Steve Loughran added a comment -

          yes, file a SPARK one and that can be a base for blame assignment.

          looking at the trace, it's happening because the actual job attempt path is mssing, which is generally considered traumatic. And you are following a codepath which only exists if you are using the v1 "all renames at the end" algorithm. Try to use the v2 algorithm, mapreduce.fileoutputcommitter.algorithm.version = 2 , to see if that helps

          Show
          stevel@apache.org Steve Loughran added a comment - yes, file a SPARK one and that can be a base for blame assignment. looking at the trace, it's happening because the actual job attempt path is mssing, which is generally considered traumatic. And you are following a codepath which only exists if you are using the v1 "all renames at the end" algorithm. Try to use the v2 algorithm, mapreduce.fileoutputcommitter.algorithm.version = 2 , to see if that helps
          Hide
          mathieude Mathieu DESPRIEE added a comment -
          Show
          mathieude Mathieu DESPRIEE added a comment - SPARK-18883

            People

            • Assignee:
              Unassigned
              Reporter:
              giuseppe.bonaccorso Giuseppe Bonaccorso
            • Votes:
              2 Vote for this issue
              Watchers:
              10 Start watching this issue

              Dates

              • Created:
                Updated:

                Development