Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-14959

​Problem Reading partitioned ORC or Parquet files

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Blocker
    • Resolution: Fixed
    • 2.0.0
    • 2.0.0
    • SQL
    • None
    • Hadoop 2.7.1.2.4.0.0-169 (HDP 2.4)

    Description

      Hello,

      I have noticed that in the pasts days there is an issue when trying to read partitioned files from HDFS.

      I am running on Spark master branch #c544356

      The write actually works but the read fails.

      Issue Reproduction
      case class Data(id: Int, text: String)
      val ds = spark.createDataset( Seq(Data(0, "hello"), Data(1, "hello"), Data(0, "world"), Data(1, "there")) )
      
      scala> ds.write.mode(org.apache.spark.sql.SaveMode.Overwrite).format("parquet").partitionBy("id").save("/user/spark/test.parquet")
      SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".                
      SLF4J: Defaulting to no-operation (NOP) logger implementation
      SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
      java.io.FileNotFoundException: Path is not a file: /user/spark/test.parquet/id=0
              at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)
              at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
              at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:652)
              at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
              at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
              at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
              at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
              at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2151)
              at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2147)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:422)
              at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
              at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2145)
      
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
        at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
        at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1242)
        at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
        at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1285)
        at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:221)
        at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:217)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:228)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:209)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog$$anonfun$9$$anonfun$apply$4.apply(fileSourceInterfaces.scala:372)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog$$anonfun$9$$anonfun$apply$4.apply(fileSourceInterfaces.scala:360)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog$$anonfun$9.apply(fileSourceInterfaces.scala:360)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog$$anonfun$9.apply(fileSourceInterfaces.scala:348)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog.listLeafFiles(fileSourceInterfaces.scala:348)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog.refresh(fileSourceInterfaces.scala:447)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog.<init>(fileSourceInterfaces.scala:291)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:314)
        at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:431)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:246)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
        ... 48 elided
      Caused by: org.apache.hadoop.ipc.RemoteException: Path is not a file: /user/spark/test.parquet/id=0
              at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)
              at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
              at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:652)
              at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
              at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
              at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
              at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
              at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2151)
              at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2147)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:422)
              at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
              at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2145)
      
        at org.apache.hadoop.ipc.Client.call(Client.java:1476)
        at org.apache.hadoop.ipc.Client.call(Client.java:1407)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy13.getBlockLocations(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
        ... 78 more
      
      // Reading the specific partitioned data works 
      scala> spark.read.format("parquet").load("/user/spark/test.parquet/id=0").show
      +-----+                                                                        
      | text|
      +-----+
      |hello|
      |world|
      +-----+
      
      // Reading all the partitions fails
      scala> spark.read.format("parquet").load("/user/spark/test.parquet").show
      java.io.FileNotFoundException: Path is not a file: /user/spark/test.parquet/id=0
              at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)
              at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
              at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:652)
              at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
              at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
              at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
              at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
              at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2151)
              at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2147)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:422)
              at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
              at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2145)
      
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
        at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
        at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1242)
        at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1227)
        at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1285)
        at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:221)
        at org.apache.hadoop.hdfs.DistributedFileSystem$1.doCall(DistributedFileSystem.java:217)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:228)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:209)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog$$anonfun$9$$anonfun$apply$4.apply(fileSourceInterfaces.scala:372)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog$$anonfun$9$$anonfun$apply$4.apply(fileSourceInterfaces.scala:360)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog$$anonfun$9.apply(fileSourceInterfaces.scala:360)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog$$anonfun$9.apply(fileSourceInterfaces.scala:348)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog.listLeafFiles(fileSourceInterfaces.scala:348)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog.refresh(fileSourceInterfaces.scala:447)
        at org.apache.spark.sql.execution.datasources.HDFSFileCatalog.<init>(fileSourceInterfaces.scala:291)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:314)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:142)
        ... 48 elided
      Caused by: org.apache.hadoop.ipc.RemoteException: Path is not a file: /user/spark/test.parquet/id=0
              at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:75)
              at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1828)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1799)
              at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1712)
              at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:652)
              at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
              at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
              at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
              at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
              at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2151)
              at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2147)
              at java.security.AccessController.doPrivileged(Native Method)
              at javax.security.auth.Subject.doAs(Subject.java:422)
              at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
              at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2145)
      
        at org.apache.hadoop.ipc.Client.call(Client.java:1476)
        at org.apache.hadoop.ipc.Client.call(Client.java:1407)
        at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
        at com.sun.proxy.$Proxy13.getBlockLocations(Unknown Source)
        at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:255)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
        at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source)
        at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1240)
        ... 77 more
      

      Attachments

        Issue Links

          Activity

            People

              xwu0226 Xin Wu
              syepes Sebastian YEPES FERNANDEZ
              Votes:
              2 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: