Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-6313

Fetch File Lock file creation doesnt work when Spark working dir is on a NFS mount

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.2.1, 1.3.0
    • Fix Version/s: 1.2.2, 1.3.1, 1.4.0
    • Component/s: Spark Core
    • Labels:
      None
    • Target Version/s:

      Description

      When running in cluster mode and mounting the spark work dir on a NFS volume (or some volume which doesn't support file locking), the fetchFile (used for downloading JARs etc on the executors) method in Spark Utils class will fail. This file locking was introduced as an improvement with SPARK-2713.

      See https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L415

      Introduced in 1.2 in commit; https://github.com/apache/spark/commit/7aacb7bfad4ec73fd8f18555c72ef696

      As this locking is for optimisation for fetching files, could we take a different approach here to create a temp/advisory lock file?

      Typically you would just mount local disks (in say ext4 format) and provide this as a comma separated list however we are trying to run Spark on MapR. With MapR we can do a loop back mount to a volume on the local node and take advantage of MapRs disk pools. This also means we dont need specific mounts for Spark and improves the generic nature of the cluster.

        Issue Links

          Activity

          Show
          nemccarthy Nathan McCarthy added a comment - Suggestion along the lines of; https://github.com/apache/lucene-solr/blob/5314a56924f46522993baf106e6deca0e48a967f/lucene/core/src/java/org/apache/lucene/store/SimpleFSLockFactory.java or https://github.com/graphhopper/graphhopper/blob/master/core/src/main/java/com/graphhopper/storage/SimpleFSLockFactory.java
          Hide
          nemccarthy Nathan McCarthy added a comment - - edited

          Since the

          val lockFileName = s"${url.hashCode}${timestamp}_lock"

          uses a timestamp I can't see there being too many problems with hanging/left over lock files.

          Show
          nemccarthy Nathan McCarthy added a comment - - edited Since the val lockFileName = s "${url.hashCode}${timestamp}_lock" uses a timestamp I can't see there being too many problems with hanging/left over lock files.
          Hide
          joshrosen Josh Rosen added a comment -

          Could you update this ticket with more details on the error-message or symptom that you've observed (such as a stacktrace)? This would be helpful in order to make this issue more searchable / discoverable.

          Show
          joshrosen Josh Rosen added a comment - Could you update this ticket with more details on the error-message or symptom that you've observed (such as a stacktrace)? This would be helpful in order to make this issue more searchable / discoverable.
          Hide
          joshrosen Josh Rosen added a comment -

          Thanks for the pointer to the Lucene lock factory code.

          It's fine for the locks to be advisory in the sense that things shouldn't break if multiple executors acquire the lock and try to download the same file, but there's potentially a problem if the lock isn't released after the JVM that acquired it exits abnormally, since this could cause other executors to block indefinitely while waiting for the original lock owner to download the file. One approach might be to write the PID of the original lock owner into the lock file, which would allow blocked executors to timeout and re-attempt the lock acquisition if they detect that the original lock holder died. This might face its own portability challenges, though, and seems complex.

          A simple hotfix might be to add a SparkConf setting to always force this caching to bypassed (this would be a two-line change to Executor.scala). This might lose the performance benefits of the caching, though.

          If you're using NFS and the shared filesystem is mounted at the same path on all nodes, I think that you should be able to use use local://path/to/nfs/ to specify the paths to your files / JARs, which will cause them to be read from the executor-local filesystem rather than fetched remotely. In this case, this would cause them to be read from NFS, so you may be able to use this technique to recover any performance benefits for large files that would be lost in disabling the caching.

          I'd be happy to review patches for this issue.

          Show
          joshrosen Josh Rosen added a comment - Thanks for the pointer to the Lucene lock factory code. It's fine for the locks to be advisory in the sense that things shouldn't break if multiple executors acquire the lock and try to download the same file, but there's potentially a problem if the lock isn't released after the JVM that acquired it exits abnormally, since this could cause other executors to block indefinitely while waiting for the original lock owner to download the file. One approach might be to write the PID of the original lock owner into the lock file, which would allow blocked executors to timeout and re-attempt the lock acquisition if they detect that the original lock holder died. This might face its own portability challenges, though, and seems complex. A simple hotfix might be to add a SparkConf setting to always force this caching to bypassed (this would be a two-line change to Executor.scala). This might lose the performance benefits of the caching, though. If you're using NFS and the shared filesystem is mounted at the same path on all nodes, I think that you should be able to use use local://path/to/nfs/ to specify the paths to your files / JARs, which will cause them to be read from the executor-local filesystem rather than fetched remotely. In this case, this would cause them to be read from NFS, so you may be able to use this technique to recover any performance benefits for large files that would be lost in disabling the caching. I'd be happy to review patches for this issue.
          Hide
          nemccarthy Nathan McCarthy added a comment -

          Stacktrace;

          14/12/12 18:18:24 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 0.0 (TID 8, hadoop-016): java.io.IOException: Permission denied
          at sun.nio.ch.FileDispatcherImpl.lock0(Native Method)
          at sun.nio.ch.FileDispatcherImpl.lock(FileDispatcherImpl.java:91)
          at sun.nio.ch.FileChannelImpl.lock(FileChannelImpl.java:1022)
          at java.nio.channels.FileChannel.lock(FileChannel.java:1052)
          at org.apache.spark.util.Utils$.fetchFile(Utils.scala:379)
          at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:350)
          at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:347)
          at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
          at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
          at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
          at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
          at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
          at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
          at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:347)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
          at java.lang.Thread.run(Thread.java:745)

          Show
          nemccarthy Nathan McCarthy added a comment - Stacktrace; 14/12/12 18:18:24 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 0.0 (TID 8, hadoop-016): java.io.IOException: Permission denied at sun.nio.ch.FileDispatcherImpl.lock0(Native Method) at sun.nio.ch.FileDispatcherImpl.lock(FileDispatcherImpl.java:91) at sun.nio.ch.FileChannelImpl.lock(FileChannelImpl.java:1022) at java.nio.channels.FileChannel.lock(FileChannel.java:1052) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:379) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:350) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:347) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:347) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
          Hide
          pwendell Patrick Wendell added a comment -

          Josh Rosen changing default caching behavior seems like it could silently regress performance for the vas majority of users who aren't on NFS. What about a hotfix for 1.3.1 that just exposes the config for NFS users (this is very small population), but doesn't change the default. That may be sufficient in itself... or if we want a real fix that makes it work out-of-the-box on NDFS, we can put it in 1.4.

          Show
          pwendell Patrick Wendell added a comment - Josh Rosen changing default caching behavior seems like it could silently regress performance for the vas majority of users who aren't on NFS. What about a hotfix for 1.3.1 that just exposes the config for NFS users (this is very small population), but doesn't change the default. That may be sufficient in itself... or if we want a real fix that makes it work out-of-the-box on NDFS, we can put it in 1.4.
          Hide
          apachespark Apache Spark added a comment -

          User 'nemccarthy' has created a pull request for this issue:
          https://github.com/apache/spark/pull/5036

          Show
          apachespark Apache Spark added a comment - User 'nemccarthy' has created a pull request for this issue: https://github.com/apache/spark/pull/5036
          Hide
          nemccarthy Nathan McCarthy added a comment -

          Thanks for the feedback guys. The config option workaround seems like the path of least resistance for now with some more testing being required for a different implementation. For us it would be great if we could get a fix ASAP. Ive created PR 5603 https://github.com/apache/spark/pull/5036

          Show
          nemccarthy Nathan McCarthy added a comment - Thanks for the feedback guys. The config option workaround seems like the path of least resistance for now with some more testing being required for a different implementation. For us it would be great if we could get a fix ASAP. Ive created PR 5603 https://github.com/apache/spark/pull/5036
          Hide
          joshrosen Josh Rosen added a comment -

          Issue resolved by pull request 5036
          https://github.com/apache/spark/pull/5036

          Show
          joshrosen Josh Rosen added a comment - Issue resolved by pull request 5036 https://github.com/apache/spark/pull/5036
          Hide
          joshrosen Josh Rosen added a comment -

          I've merged Nathan's patch into 1.4.0, 1.3.1, and 1.2.2. After this path, users can work around this bug by setting spark.files.useFetchCache=false in their SparkConf.

          Show
          joshrosen Josh Rosen added a comment - I've merged Nathan's patch into 1.4.0, 1.3.1, and 1.2.2. After this path, users can work around this bug by setting spark.files.useFetchCache=false in their SparkConf.

            People

            • Assignee:
              nemccarthy Nathan McCarthy
              Reporter:
              nemccarthy Nathan McCarthy
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development