Details
-
Bug
-
Status: Resolved
-
Minor
-
Resolution: Duplicate
-
2.0.0
-
None
-
None
-
Hadoop version: 2.5.1
Description
When writing a Dataset in a CSV file, the following exception java.io.FileNotFoundException is raised after the writing is done and successful.
This behaviour does not happen when the spark application is launched locally ; it should be related to hdfs management.
Here is a test code:
import org.apache.spark.SparkContext import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.spark.sql.SQLContext case class Test(A: String, B: String, C:String){ } object WriteTest { val sc: SparkContext = new SparkContext() val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration) val sqlContext: SQLContext = new SQLContext(sc) import sqlContext.implicits._ def main(args: Array[String]):Unit = { val ds = Seq( Test("abc", "abc", "abc"), Test("abc", "abc", "def"), Test("abc", "abc", "ghi"), Test("abc", "xyz", "abc"), Test("xyz", "xyz", "abc") ).toDS() // works ds .write .option("header",true) .mode("overwrite") .csv("/tmp/test1.csv") // fails ds .write .option("header",true) .mode("overwrite") .partitionBy("A", "B") .csv("/tmp/test2.csv") } }
and here is the exception stack:
java.io.FileNotFoundException: Path is not a file: /tmp/test2.csv/A=abc at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:68) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:519) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:337) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1222) at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1210) at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1260) at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:220) at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:216) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:216) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:208) at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:104) at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1$$anonfun$apply$2.apply(ListingFileCatalog.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:92) at org.apache.spark.sql.execution.datasources.ListingFileCatalog$$anonfun$1.apply(ListingFileCatalog.scala:80) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.sql.execution.datasources.ListingFileCatalog.listLeafFiles(ListingFileCatalog.scala:80) at org.apache.spark.sql.execution.datasources.ListingFileCatalog.refresh(ListingFileCatalog.scala:69) at org.apache.spark.sql.execution.datasources.ListingFileCatalog.<init>(ListingFileCatalog.scala:50) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:424) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:252) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:234) at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:697) at WriteTest$.main(write.scala:42) at WriteTest.main(write.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:610)
And here is the result of hdfs dfs -ls /tmp/test2.csv:
Found 3 items drwxr-xr-x - hadoop supergroup 0 2016-06-21 14:59 /tmp/test2.csv/A=abc drwxr-xr-x - hadoop supergroup 0 2016-06-21 14:59 /tmp/test2.csv/A=xyz -rw-r--r-- 3 hadoop supergroup 0 2016-06-21 14:59 /tmp/test2.csv/_SUCCESS
I have no idea if the bug comes from the hdfs implementation or the way spark uses it.
Attachments
Attachments
Issue Links
- duplicates
-
SPARK-14959 ​Problem Reading partitioned ORC or Parquet files
- Resolved
Romain Giot have a look at https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark first. Don't set Blocker, but this doesn't seem nearly that important.
You need to write an hdfs:// URI if you intend to reference an HDFS path.