Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-25778

WriteAheadLogBackedBlockRDD in YARN Cluster Mode Fails due lack of access to tmpDir from $PWD to HDFS

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.0, 2.1.1, 2.1.2, 2.1.3, 2.2.0, 2.2.1, 2.2.2, 2.3.1, 2.3.2
    • 2.4.1, 3.0.0
    • None

    Description

      WriteAheadLogBackedBlockRDD in YARN Cluster Mode Fails due lack of access to HDFS path due to it using a similar name was $PWD folder from YARN AM Cluster Mode for Spark

      While attempting to use Spark Streaming and WriteAheadLogs. I noticed the following errors after the driver attempted to recovery the already read data that was being written to HDFS in the checkpoint folder. After spending many hours looking at the cause of the following error below due to the fact the parent folder /hadoop exists in our HDFS FS.. I am wonder if its possible to make an option configurable to choose an alternate bogus directory that will never be used.

      hadoop fs -ls /
      drwx------ - dsadm dsadm 0 2017-06-20 13:20 /hadoop
      hadoop fs -ls /hadoop/apps
      drwx------ - dsadm dsadm 0 2017-06-20 13:20 /hadoop/apps

      streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
      val nonExistentDirectory = new File(
      System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
      writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
      SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
      dataRead = writeAheadLog.read(partition.walRecordHandle)

      18/10/19 00:03:03 DEBUG YarnSchedulerBackend$YarnDriverEndpoint: Launching task 72 on executor id: 1 hostname: ha20t5002dn.tech.hdp.example.com.
      18/10/19 00:03:03 DEBUG BlockManager: Getting local block broadcast_4_piece0 as bytes
      18/10/19 00:03:03 DEBUG BlockManager: Level for block broadcast_4_piece0 is StorageLevel(disk, memory, 1 replicas)
      18/10/19 00:03:03 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on ha20t5002dn.tech.hdp.example.com:32768 (size: 33.7 KB, free: 912.2 MB)
      18/10/19 00:03:03 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 71, ha20t5002dn.tech.hdp.example.com, executor 1): org.apache.spark.SparkException: Could not read data from write ahead log record FileBasedWriteAheadLogSegment(hdfs://tech/user/hdpdevspark/sparkstreaming/Spark_Streaming_MQ_IDMS/receivedData/0/log-1539921695606-1539921755606,0,1017)
      at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:145)
      at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:173)
      at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD$$anonfun$compute$1.apply(WriteAheadLogBackedBlockRDD.scala:173)
      at scala.Option.getOrElse(Option.scala:121)
      at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:173)
      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:108)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
      Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=hdpdevspark, access=EXECUTE, inode="/hadoop/diskc/hadoop/yarn/local/usercache/hdpdevspark/appcache/application_1539554105597_0338/container_e322_1539554105597_0338_01_000002/tmp/170f36b8-9202-4556-89a4-64587c7136b6":dsadm:dsadm:drwx------
      at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
      at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259)
      at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205)
      at org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer$RangerAccessControlEnforcer.checkPermission(RangerHdfsAuthorizer.java:307)
      at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
      at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1827)
      at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:108)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3972)
      at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1130)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:851)
      at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1740)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)

      at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
      at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
      at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
      at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
      at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
      at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
      at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2110)
      at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
      at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
      at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
      at com.wandisco.fs.client.ReplicatedFC.xlateAndGetFileStatus(ReplicatedFC.java:283)
      at com.wandisco.fs.client.FusionHdfs.getFileStatus(FusionHdfs.java:277)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:245)
      at org.apache.spark.streaming.util.FileBasedWriteAheadLog.<init>(FileBasedWriteAheadLog.scala:80)
      at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142)
      at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:142)
      at scala.Option.getOrElse(Option.scala:121)
      at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:141)
      at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForReceiver(WriteAheadLogUtils.scala:111)
      at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:140)
      ... 12 more
      Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=hdpdevspark, access=EXECUTE, inode="/hadoop/diskc/hadoop/yarn/local/usercache/hdpdevspark/appcache/application_1539554105597_0338/container_e322_1539554105597_0338_01_000002/tmp/170f36b8-9202-4556-89a4-64587c7136b6":dsadm:dsadm:drwx------
      at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
      at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259)
      at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205)
      at org.apache.ranger.authorization.hadoop.RangerHdfsAuthorizer$RangerAccessControlEnforcer.checkPermission(RangerHdfsAuthorizer.java:307)
      at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
      at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1827)
      at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:108)
      at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3972)
      at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1130)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:851)
      at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
      at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1740)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)

      at org.apache.hadoop.ipc.Client.call(Client.java:1475)
      at org.apache.hadoop.ipc.Client.call(Client.java:1412)
      at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
      at com.sun.proxy.$Proxy17.getFileInfo(Unknown Source)
      at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
      at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
      at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
      at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
      ... 26 more

      18/10/19 00:03:03 DEBUG YarnClusterScheduler: parentName: , name: TaskSet_3.0, runningTasks: 0
      18/10/19 00:03:03 DEBUG YarnClusterScheduler: parentName: , name: TaskSet_4.0, runningTasks: 1

      Attachments

        1. SPARK-25778.2.patch
          2 kB
          Greg Senia
        2. SPARK-25778.4.patch
          1 kB
          Greg Senia
        3. SPARK-25778.patch
          1 kB
          Greg Senia

        Issue Links

          Activity

            People

              gss2002 Greg Senia
              gss2002 Greg Senia
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: