Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Duplicate
    • 2.0.0
    • None
    • Input/Output
    • None
    • Hadoop version: 2.5.1

    Description

      When writing a Dataset in a CSV file, the following exception java.io.FileNotFoundException is raised after the writing is done and successful.

      This behaviour does not happen when the spark application is launched locally ; it should be related to hdfs management.

      Here is a test code:

      import org.apache.spark.SparkContext
      import org.apache.hadoop.fs.FileSystem
      import org.apache.hadoop.fs.{Path, PathFilter}
      import org.apache.spark.sql.SQLContext
      
      
      
      case class Test(A: String, B: String, C:String){
      }
      
      object WriteTest {
        val sc: SparkContext = new SparkContext()
        val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
        val sqlContext: SQLContext = new SQLContext(sc)
        
        import sqlContext.implicits._
      
        def main(args: Array[String]):Unit = {
      
          val ds = Seq(
            Test("abc", "abc", "abc"),
            Test("abc", "abc", "def"), 
            Test("abc", "abc", "ghi"), 
            Test("abc", "xyz", "abc"),
            Test("xyz", "xyz", "abc")
          ).toDS()
      
      
          // works
          ds
            .write
            .option("header",true)
            .mode("overwrite")
            .csv("/tmp/test1.csv")
      
          // fails
          ds
            .write
            .option("header",true)
            .mode("overwrite")
            .partitionBy("A", "B")
            .csv("/tmp/test2.csv")
      
        }
      }
      

      and here is the exception stack:

      java.io.FileNotFoundException: Path is not a file: /tmp/test2.csv/A=abc
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68)
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)
      	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:519)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:337)
      	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
      	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
      	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)
      
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
      	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
      	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
      	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1222)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1210)
      	at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1260)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:220)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:216)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:216)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:208)
      	at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:104)
      	at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:92)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
      	at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:92)
      	at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:80)
      	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
      	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
      	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
      	at org.apache.spark.sql.execution.datasources.ListingFileCatalog.listLeafFiles(ListingFileCatalog.scala:80)
      	at org.apache.spark.sql.execution.datasources.ListingFileCatalog.refresh(ListingFileCatalog.scala:69)
      	at org.apache.spark.sql.execution.datasources.ListingFileCatalog.<init>(ListingFileCatalog.scala:50)
      	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307)
      	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:424)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252)
      	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234)
      	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:697)
      	at WriteTest$.main(write.scala:42)
      	at WriteTest.main(write.scala)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:483)
      	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:610)
      

      And here is the result of hdfs dfs -ls /tmp/test2.csv:

      Found 3 items
      drwxr-xr-x   - hadoop supergroup          0 2016-06-21 14:59 /tmp/test2.csv/A=abc
      drwxr-xr-x   - hadoop supergroup          0 2016-06-21 14:59 /tmp/test2.csv/A=xyz
      -rw-r--r--   3 hadoop supergroup          0 2016-06-21 14:59 /tmp/test2.csv/_SUCCESS
      
      

      I have no idea if the bug comes from the hdfs implementation or the way spark uses it.

      Attachments

        Issue Links

        Activity

          srowen Sean R. Owen added a comment -

          Romain Giot have a look at https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark first. Don't set Blocker, but this doesn't seem nearly that important.

          You need to write an hdfs:// URI if you intend to reference an HDFS path.

          srowen Sean R. Owen added a comment - Romain Giot have a look at https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark first. Don't set Blocker, but this doesn't seem nearly that important. You need to write an hdfs:// URI if you intend to reference an HDFS path.
          rgiot Romain Giot added a comment -

          Ok, sorry for the too high importance of the bug.
          Note that, on my configuration, the behaviour is exactly the same if I explicitly prepend the paths by hdfs://<server>:<port>

          rgiot Romain Giot added a comment - Ok, sorry for the too high importance of the bug. Note that, on my configuration, the behaviour is exactly the same if I explicitly prepend the paths by hdfs://<server>:<port>
          rgiot Romain Giot added a comment -

          There is a high probability that this issue is a duplicate of issue SPARK-14959

          rgiot Romain Giot added a comment - There is a high probability that this issue is a duplicate of issue SPARK-14959

          People

            Unassigned Unassigned
            rgiot Romain Giot
            Votes:
            0 Vote for this issue
            Watchers:
            Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment

                  Client must be authenticated to access this resource.