Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-22403

StructuredKafkaWordCount example fails in YARN cluster mode

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.2.0
    • 2.2.1, 2.3.0
    • Structured Streaming
    • None

    Description

      When I run the StructuredKafkaWordCount example in YARN client mode, it runs fine. However, when I run it in YARN cluster mode, the application errors during initialization, and dies after the default number of YARN application attempts. In the AM log, I see

      17/10/30 11:34:52 INFO execution.SparkSqlParser: Parsing command: CAST(value AS STRING)
      17/10/30 11:34:53 ERROR streaming.StreamMetadata: Error writing stream metadata StreamMetadata(b71ca714-a7a1-467f-96aa-023375964429) to /data/yarn/nm/usercache/systest/appcache/application_1508800814252_0047/container_1508800814252_0047_01_000001/tmp/temporary-b5ced4ae-32e0-4725-b905-aad679aec9b5/metadata
      org.apache.hadoop.security.AccessControlException: Permission denied: user=systest, access=WRITE, inode="/":hdfs:supergroup:drwxr-xr-x
      	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:397)
      	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:256)
      	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:194)
      	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1842)
      	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1826)
      	at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1785)
      	at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:315)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2313)
      	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2257)
      ...
              at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:280)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1235)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1214)
      	at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1152)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:458)
      	at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:455)
      	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:469)
      	at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:396)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:972)
      	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:960)
      	at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:76)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:116)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$6.apply(StreamExecution.scala:114)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:114)
      	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
      	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
      	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
      	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount$.main(StructuredKafkaWordCount.scala:79)
      	at org.apache.spark.examples.sql.streaming.StructuredKafkaWordCount.main(StructuredKafkaWordCount.scala)
      

      Looking at StreamingQueryManager#createQuery, we have
      https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala#L198

          val checkpointLocation = userSpecifiedCheckpointLocation.map { ...
            ...
          }.orElse {
            ...
          }.getOrElse {
            if (useTempCheckpointLocation) {
              // Delete the temp checkpoint when a query is being stopped without errors.
              deleteCheckpointOnStop = true
              Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
            } else {
              ...
            }
          }
      

      And Utils.createTempDir has

        def createTempDir(
            root: String = System.getProperty("java.io.tmpdir"),
            namePrefix: String = "spark"): File = {
          val dir = createDirectory(root, namePrefix)
          ShutdownHookManager.registerShutdownDeleteDir(dir)
          dir
        }
      

      In client mode, java.io.tmpdir is set to "/tmp", which also exists in HDFS and has permissions 1777. In cluster mode, java.io.tmpdir is set in the YARN AM to "$PWD/tmp", where PWD is "${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}/container_${contid}".
      The problem is that Spark is using java.io.tmpdir, which is a path in the local filesystem, as a path in HDFS. When that path is "/tmp", which happens to exist in HDFS, no problem arises, but that is just by coincidence.

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            wypoon Wing Yew Poon Assign to me
            wypoon Wing Yew Poon
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment