Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24364

Files deletion after globbing may fail StructuredStreaming jobs

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 2.3.0, 2.4.0
    • Fix Version/s: 2.3.1, 2.4.0
    • Component/s: Structured Streaming
    • Labels:
      None

      Description

      This is related with SPARK-17599. SPARK-17599 checked the directory only but actually it can file on another FileSystem operation when the file is not found. For example see:

      Error occurred while processing: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2029)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2000)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1913)
      	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
      	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
      	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
      	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
      
      java.io.FileNotFoundException: File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2029)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2000)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1913)
      	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
      	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
      	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
      	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
      
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
      	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
      	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1251)
      	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1236)
      	at org.apache.hadoop.hdfs.DFSClient.getBlockLocations(DFSClient.java:1294)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:242)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$2.doCall(DistributedFileSystem.java:238)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:249)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileBlockLocations(DistributedFileSystem.java:229)
      	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:314)
      	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles$3.apply(InMemoryFileIndex.scala:297)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
      	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$listLeafFiles(InMemoryFileIndex.scala:297)
      	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:174)
      	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$$anonfun$org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles$1.apply(InMemoryFileIndex.scala:173)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
      	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
      	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
      	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex$.org$apache$spark$sql$execution$datasources$InMemoryFileIndex$$bulkListLeafFiles(InMemoryFileIndex.scala:173)
      	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.listLeafFiles(InMemoryFileIndex.scala:126)
      	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.refresh0(InMemoryFileIndex.scala:91)
      	at org.apache.spark.sql.execution.datasources.InMemoryFileIndex.<init>(InMemoryFileIndex.scala:67)
      	at org.apache.spark.sql.execution.datasources.DataSource.tempFileIndex$lzycompute$1(DataSource.scala:161)
      	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$tempFileIndex$1(DataSource.scala:152)
      	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:166)
      	at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:261)
      	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:94)
      	at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)
      	at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:33)
      	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:196)
      	at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:206)
      	at com.hwx.StreamTest$.main(StreamTest.scala:97)
      	at com.hwx.StreamTest.main(StreamTest.scala)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
      	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:906)
      	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
      	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
      	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
      	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /rel/00171151/input/PJ/part-00136-b6403bac-a240-44f8-a792-fc2e174682b7-c000.csv
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
      	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2029)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:2000)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1913)
      	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:700)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:377)
      	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
      	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
      	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
      	at java.security.AccessController.doPrivileged(Native Method)
      	at javax.security.auth.Subject.doAs(Subject.java:422)
      	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
      	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)
      
      	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1498)
      	at org.apache.hadoop.ipc.Client.call(Client.java:1398)
      	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
      	at com.sun.proxy.$Proxy12.getBlockLocations(Unknown Source)
      	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:272)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:498)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
      	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
      	at com.sun.proxy.$Proxy13.getBlockLocations(Unknown Source)
      	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1249)
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                hyukjin.kwon Hyukjin Kwon
                Reporter:
                hyukjin.kwon Hyukjin Kwon
              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: