Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5839 Flink Security problem collection
  3. FLINK-6020

Blob Server cannot handle multiple job submits (with same content) parallelly

    Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Labels:
      None

      Description

      In yarn-cluster mode, if we submit one same job multiple times parallelly, the task will encounter class load problem and lease occuputation.

      Because blob server stores user jars in name with generated sha1sum of those, first writes a temp file and move it to finalialize. For recovery it also will put them to HDFS with same file name.

      In same time, when multiple clients sumit same job with same jar, the local jar files in blob server and those file on hdfs will be handled in multiple threads(BlobServerConnection), and impact each other.

      It's better to have a way to handle this, now two ideas comes up to my head:
      1. lock the write operation, or
      2. use some unique identifier as file name instead of ( or added up to) sha1sum of the file contents.

        Issue Links

          Activity

          Hide
          WangTao Tao Wang added a comment -

          I've found a way to solve this by adding a random integer suffix to blob key. Will post the commit later.

          Show
          WangTao Tao Wang added a comment - I've found a way to solve this by adding a random integer suffix to blob key. Will post the commit later.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user WangTaoTheTonic opened a pull request:

          https://github.com/apache/flink/pull/3525

          FLINK-6020add a random integer suffix to blob key to avoid naming conflicting

          In yarn-cluster mode, if we submit one same job multiple times parallelly, the task will encounter class load problem and lease occuputation.

          Because blob server stores user jars in name with generated sha1sum of those, first writes a temp file and move it to finalialize. For recovery it also will put them to HDFS with same file name.

          In same time, when multiple clients sumit same job with same jar, the local jar files in blob server and those file on hdfs will be handled in multiple threads(BlobServerConnection), and impact each other.

          I've found a way to solve this by adding a random integer suffix to blob key. Like changed here.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/WangTaoTheTonic/flink FLINK-6020

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3525.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3525


          commit 3d9f41afad9c831431b3c7bd0eb2a8006b92718e
          Author: WangTaoTheTonic <wangtao111@huawei.com>
          Date: 2017-03-13T11:52:36Z

          add a random integer suffix to blob key to avoid naming conflicting


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user WangTaoTheTonic opened a pull request: https://github.com/apache/flink/pull/3525 FLINK-6020 add a random integer suffix to blob key to avoid naming conflicting In yarn-cluster mode, if we submit one same job multiple times parallelly, the task will encounter class load problem and lease occuputation. Because blob server stores user jars in name with generated sha1sum of those, first writes a temp file and move it to finalialize. For recovery it also will put them to HDFS with same file name. In same time, when multiple clients sumit same job with same jar, the local jar files in blob server and those file on hdfs will be handled in multiple threads(BlobServerConnection), and impact each other. I've found a way to solve this by adding a random integer suffix to blob key. Like changed here. You can merge this pull request into a Git repository by running: $ git pull https://github.com/WangTaoTheTonic/flink FLINK-6020 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3525.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3525 commit 3d9f41afad9c831431b3c7bd0eb2a8006b92718e Author: WangTaoTheTonic <wangtao111@huawei.com> Date: 2017-03-13T11:52:36Z add a random integer suffix to blob key to avoid naming conflicting
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3525

          I don't quite understand the issue. Currently, the name should exactly match the hash to make sure that each library is stored only once. Adding a random suffix exactly destroys that behavior.

          In the case where multiple clients upload the same jar to different clusters, it should not be a problem, if they use different storage directories (which they should definitely do).

          In the case where multiple clients upload the same jar to the same cluster, the first rename from tmp to file will succeed. The second rename from tmp to file will fail, but that's not a problem, because the file already exists with the same contents, and the client can assume success.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 I don't quite understand the issue. Currently, the name should exactly match the hash to make sure that each library is stored only once. Adding a random suffix exactly destroys that behavior. In the case where multiple clients upload the same jar to different clusters, it should not be a problem, if they use different storage directories (which they should definitely do). In the case where multiple clients upload the same jar to the same cluster, the first rename from tmp to file will succeed. The second rename from tmp to file will fail, but that's not a problem, because the file already exists with the same contents, and the client can assume success.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3525

          The second rename will not fail, but make the file which written by the first corrupted, which will make the first job failed if the task is loading this jar.

          by the way, the jar file will be uploaded to hdfs for recovery, and the uploading will fail too if there are more than two clients writing file with same name.

          It is easy to reoccur. First launch a session with enough slots, then run a script contains many same job submitting, says there are 20 lines of "flink run ../examples/steaming/WindowJoin.jar &". Make sure there's a "&" in end of each line to make them run in parallel.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 The second rename will not fail, but make the file which written by the first corrupted, which will make the first job failed if the task is loading this jar. by the way, the jar file will be uploaded to hdfs for recovery, and the uploading will fail too if there are more than two clients writing file with same name. It is easy to reoccur. First launch a session with enough slots, then run a script contains many same job submitting, says there are 20 lines of "flink run ../examples/steaming/WindowJoin.jar &". Make sure there's a "&" in end of each line to make them run in parallel.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3525

          I think we should then fix this in the blob server.

          The problem that only one should succeed upon collision should be fixable by using `Files.move()` with `ATOMIC_MOVE`. Only when that succeeds, we store the file in the blob store.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 I think we should then fix this in the blob server. The problem that only one should succeed upon collision should be fixable by using `Files.move()` with `ATOMIC_MOVE`. Only when that succeeds, we store the file in the blob store. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3525

          Right...I have same thought as you at the beginning and i've tried to make the move atomic but it has serveral side affect, like:
          1. if we use this way to handle this, which means two job can share the same jar file in blobserver, it will be a problem when one of them being canceled and deleting its jars(now it seems like it doesn't do the delete, but it should do)
          2. for job recovery(or other kind of recovery, i'm not sure, just observed the phenomenon) blob server will upload jars to hdfs using same name of local file. Even the two jobs share same jar in blob store, they will upload it twice at same time, which will cause file lease occuptation in hdfs.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 Right...I have same thought as you at the beginning and i've tried to make the move atomic but it has serveral side affect, like: 1. if we use this way to handle this, which means two job can share the same jar file in blobserver, it will be a problem when one of them being canceled and deleting its jars(now it seems like it doesn't do the delete, but it should do) 2. for job recovery(or other kind of recovery, i'm not sure, just observed the phenomenon) blob server will upload jars to hdfs using same name of local file. Even the two jobs share same jar in blob store, they will upload it twice at same time, which will cause file lease occuptation in hdfs.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3525

          ping @StephanEwen

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 ping @StephanEwen
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3525

          hi stephan, could you help review? @StephanEwen

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 hi stephan, could you help review? @StephanEwen
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3525

          I am currently travelling and then attending Flink Forward. Will come back to this after that.

          Quick feedback:

          • I am still thinking that the random suffix breaks the original idea of the cached blobs.
          • The blob manager counts references to files and does not delete them as long as someone has a reference. That prevents deletion if multiple parties work with the same jar.
          • Properly handling rename and add reference in one lock, as well as de-reference and delete in the same lock should fix it, I think
          • The blob manager needs to make sure it has an exclusive directory, so that no other process accesses the files. But I think that is the case already.
          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 I am currently travelling and then attending Flink Forward. Will come back to this after that. Quick feedback: I am still thinking that the random suffix breaks the original idea of the cached blobs. The blob manager counts references to files and does not delete them as long as someone has a reference. That prevents deletion if multiple parties work with the same jar. Properly handling rename and add reference in one lock, as well as de-reference and delete in the same lock should fix it, I think The blob manager needs to make sure it has an exclusive directory, so that no other process accesses the files. But I think that is the case already.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3525

          @WangTaoTheTonic I will take this issue next week. I think we can fix this using `Files.move(src, dest, ATOMIC_MOVE)` to avoid that multiple jobs get in each others' way. By that we preserve the cross-job caching behavior and should fix the issue you described.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 @WangTaoTheTonic I will take this issue next week. I think we can fix this using `Files.move(src, dest, ATOMIC_MOVE)` to avoid that multiple jobs get in each others' way. By that we preserve the cross-job caching behavior and should fix the issue you described.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user netguy204 commented on the issue:

          https://github.com/apache/flink/pull/3525

          +1 I'm looking forward to this fix as I think I'm encountering this bug in production.

          I bundle my jobs into a single JAR file with multiple mains. I submit the jobs to the cluster sequentially (once the cluster accepts one I submit the next). My job also has two dependency JARs that I provide via HTTP using the -C switch to flink.

          When a job fails it automatically restarts but it seems to cause other jobs from the same JAR to fail and restart as well. The error is always some variation of:

          ```
          java.lang.IllegalStateException: zip file closed
          at java.util.zip.ZipFile.ensureOpen(ZipFile.java:669)
          at java.util.zip.ZipFile.getEntry(ZipFile.java:309)
          at java.util.jar.JarFile.getEntry(JarFile.java:240)
          at sun.net.www.protocol.jar.URLJarFile.getEntry(URLJarFile.java:128)
          at java.util.jar.JarFile.getJarEntry(JarFile.java:223)
          at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005)
          at sun.misc.URLClassPath$JarLoader.findResource(URLClassPath.java:983)
          at sun.misc.URLClassPath.findResource(URLClassPath.java:188)
          at java.net.URLClassLoader$2.run(URLClassLoader.java:569)
          at java.net.URLClassLoader$2.run(URLClassLoader.java:567)
          at java.security.AccessController.doPrivileged(Native Method)
          at java.net.URLClassLoader.findResource(URLClassLoader.java:566)
          at java.lang.ClassLoader.getResource(ClassLoader.java:1093)
          at java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232)
          .... backtrace from some arbitrary point in my code that never is doing anything with reflection ...
          ```

          The class load that triggers the fault is arbitrary. The same job may fail and restart multiple times in the same day with a different failing class load.

          Show
          githubbot ASF GitHub Bot added a comment - Github user netguy204 commented on the issue: https://github.com/apache/flink/pull/3525 +1 I'm looking forward to this fix as I think I'm encountering this bug in production. I bundle my jobs into a single JAR file with multiple mains. I submit the jobs to the cluster sequentially (once the cluster accepts one I submit the next). My job also has two dependency JARs that I provide via HTTP using the -C switch to flink. When a job fails it automatically restarts but it seems to cause other jobs from the same JAR to fail and restart as well. The error is always some variation of: ``` java.lang.IllegalStateException: zip file closed at java.util.zip.ZipFile.ensureOpen(ZipFile.java:669) at java.util.zip.ZipFile.getEntry(ZipFile.java:309) at java.util.jar.JarFile.getEntry(JarFile.java:240) at sun.net.www.protocol.jar.URLJarFile.getEntry(URLJarFile.java:128) at java.util.jar.JarFile.getJarEntry(JarFile.java:223) at sun.misc.URLClassPath$JarLoader.getResource(URLClassPath.java:1005) at sun.misc.URLClassPath$JarLoader.findResource(URLClassPath.java:983) at sun.misc.URLClassPath.findResource(URLClassPath.java:188) at java.net.URLClassLoader$2.run(URLClassLoader.java:569) at java.net.URLClassLoader$2.run(URLClassLoader.java:567) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findResource(URLClassLoader.java:566) at java.lang.ClassLoader.getResource(ClassLoader.java:1093) at java.net.URLClassLoader.getResourceAsStream(URLClassLoader.java:232) .... backtrace from some arbitrary point in my code that never is doing anything with reflection ... ``` The class load that triggers the fault is arbitrary. The same job may fail and restart multiple times in the same day with a different failing class load.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3525

          @netguy204 I think you are affected by a different issue. In your case, there are no damaged jar files, but it looks like the classloader has been closed.

          Flink creates classloaders per job and caches them across different tasks of that job. It closes the dynamically created classloaders when all tasks from the job are done.

          Is it possible that a classloader passes between jobs, meaning that another job uses a class loaders that was created for another job? Do you store some objects / classes / classloaders somewhere in a static context or a cache or interner so that it can be that one job created them and another job re-uses them?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 @netguy204 I think you are affected by a different issue. In your case, there are no damaged jar files, but it looks like the classloader has been closed. Flink creates classloaders per job and caches them across different tasks of that job. It closes the dynamically created classloaders when all tasks from the job are done. Is it possible that a classloader passes between jobs, meaning that another job uses a class loaders that was created for another job? Do you store some objects / classes / classloaders somewhere in a static context or a cache or interner so that it can be that one job created them and another job re-uses them?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user netguy204 commented on the issue:

          https://github.com/apache/flink/pull/3525

          @StephanEwen Yes, I do have at least objects and classes being stored in a static context. Any easy example (that has also bitten me a few times) is the class cache that Avro maintains:

          https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java#L146

          The Avro API's, unless told otherwise, will use a singleton instance of SpecificData and will access that shared cache.

          Would something like that be enough to cause the classloader to pass between jobs?

          Show
          githubbot ASF GitHub Bot added a comment - Github user netguy204 commented on the issue: https://github.com/apache/flink/pull/3525 @StephanEwen Yes, I do have at least objects and classes being stored in a static context. Any easy example (that has also bitten me a few times) is the class cache that Avro maintains: https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java#L146 The Avro API's, unless told otherwise, will use a singleton instance of SpecificData and will access that shared cache. Would something like that be enough to cause the classloader to pass between jobs?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3525

          Yes, @netguy204 - that is definitely one possible way for class loaders to leak over...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 Yes, @netguy204 - that is definitely one possible way for class loaders to leak over...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3525

          @WangTaoTheTonic I have debugged a bit further in this issue, and it seems there is a bit more to do.
          For non-HA blob servers, the atomic rename fix would do it.

          For HA cases, we need to do a bit more. A recent change was that the blob cache will try and fetch blobs directly from the blob store, which may cause pre-mature reads before the blob has been fully written. Because the storage systems we target for HA do not all support atomic renames (S3 does not), we need to use the `_SUCCESS` file trick to mark completed blobs.

          I chatted with @tillrohrmann about that, he agreed to take a look at fixing these and will make an effort to get this into the 1.3 release. Hope that this will work for you.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 @WangTaoTheTonic I have debugged a bit further in this issue, and it seems there is a bit more to do. For non-HA blob servers, the atomic rename fix would do it. For HA cases, we need to do a bit more. A recent change was that the blob cache will try and fetch blobs directly from the blob store, which may cause pre-mature reads before the blob has been fully written. Because the storage systems we target for HA do not all support atomic renames (S3 does not), we need to use the `_SUCCESS` file trick to mark completed blobs. I chatted with @tillrohrmann about that, he agreed to take a look at fixing these and will make an effort to get this into the 1.3 release. Hope that this will work for you.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3525

          For HA case, the blob server will upload jars to HDFS for recovery, and there's a cocurrent operations here too. I'm not sure if the solutions ou proposed can cover that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 For HA case, the blob server will upload jars to HDFS for recovery, and there's a cocurrent operations here too. I'm not sure if the solutions ou proposed can cover that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3525

          @WangTaoTheTonic I think we can solve that the following way:

          • The local upload uses `ATOMIC_MOVE` to rename the file
          • Only the thread that succeeds will store the blob in HDFS or S3

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3525 @WangTaoTheTonic I think we can solve that the following way: The local upload uses `ATOMIC_MOVE` to rename the file Only the thread that succeeds will store the blob in HDFS or S3 What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3525

          That looks good to me. Looking forward to fix from @tillrohrmann. Thank you very much

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 That looks good to me. Looking forward to fix from @tillrohrmann. Thank you very much
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user tillrohrmann opened a pull request:

          https://github.com/apache/flink/pull/3888

          FLINK-6020 Introduce BlobServer#readWriteLock to synchronize file creation and deletion

          This PR is based on #3873 and #3864.

          This commit introduces a BlobServer#readWriteLock in order to synchronize file creation
          and deletion operations in BlobServerConnection and BlobServer. This will prevent
          that multiple put, delete and get operations interfere with each other.

          The get operations are synchronized using the read lock in order to guarantee some kind of
          parallelism.

          What this PR does not address is the handling of concurrent writes and reads to the `BlobStore`. This could be solved via SUCCESS files in order to indicate the completion of a file. However, the first read operation should now happen strictly after the write operation due to the locking.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/tillrohrmann/flink concurrentBlobUploads

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3888.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3888


          commit 28bc0acd5c2d3858e41fd29113fff1c7a40471f5
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-11T15:36:17Z

          FLINK-6555 [futures] Generalize ConjunctFuture to return results

          The ConjunctFuture now returns the set of future values once it is completed.

          commit f221353584c9089552572387eee9e162695311cd
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-12T09:05:13Z

          Introduce WaitingConjunctFuture; Fix thread safety issue with ResultConjunctFuture

          The WaitingConjunctFuture waits for the completion of its futures. The future values
          are discarded making it more efficient than the ResultConjunctFuture which returns
          the futures' values. The WaitingConjunctFuture is instantiated via
          FutureUtils.waitForAll(Collection<Future>).

          commit 79eafa2671f4a4dfdc9ab135443c339ef2e8001a
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-09T08:26:37Z

          FLINK-6519 Integrate BlobStore in lifecycle management of HighAvailabilityServices

          The HighAvailabilityService creates a single BlobStoreService instance which is
          shared by all BlobServer and BlobCache instances. The BlobStoreService's lifecycle
          is exclusively managed by the HighAvailabilityServices. This means that the
          BlobStore's content is only cleaned up if the HighAvailabilityService's HA data
          is cleaned up. Having this single point of control, makes it easier to decide when
          to discard HA data (e.g. in case of a successful job execution) and when to retain
          the data (e.g. for recovery).

          Close and cleanup all data of BlobStore in HighAvailabilityServices

          Use HighAvailabilityServices to create BlobStore

          Introduce BlobStoreService interface to hide close and closeAndCleanupAllData methods

          commit c6ff2ced58e60f63d0236e53c83192f64479c44a
          Author: Till Rohrmann <trohrmann@apache.org>
          Date: 2017-05-10T15:38:49Z

          FLINK-6020 Introduce BlobServer#readWriteLock to synchronize file creation and deletion

          This commit introduces a BlobServer#readWriteLock in order to synchronize file creation
          and deletion operations in BlobServerConnection and BlobServer. This will prevent
          that multiple put and get operations interfere with each other and with get operations.

          The get operations are synchronized using the read lock in order to guarantee some kind of
          parallelism.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3888 FLINK-6020 Introduce BlobServer#readWriteLock to synchronize file creation and deletion This PR is based on #3873 and #3864. This commit introduces a BlobServer#readWriteLock in order to synchronize file creation and deletion operations in BlobServerConnection and BlobServer. This will prevent that multiple put, delete and get operations interfere with each other. The get operations are synchronized using the read lock in order to guarantee some kind of parallelism. What this PR does not address is the handling of concurrent writes and reads to the `BlobStore`. This could be solved via SUCCESS files in order to indicate the completion of a file. However, the first read operation should now happen strictly after the write operation due to the locking. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink concurrentBlobUploads Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3888.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3888 commit 28bc0acd5c2d3858e41fd29113fff1c7a40471f5 Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-11T15:36:17Z FLINK-6555 [futures] Generalize ConjunctFuture to return results The ConjunctFuture now returns the set of future values once it is completed. commit f221353584c9089552572387eee9e162695311cd Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-12T09:05:13Z Introduce WaitingConjunctFuture; Fix thread safety issue with ResultConjunctFuture The WaitingConjunctFuture waits for the completion of its futures. The future values are discarded making it more efficient than the ResultConjunctFuture which returns the futures' values. The WaitingConjunctFuture is instantiated via FutureUtils.waitForAll(Collection<Future>). commit 79eafa2671f4a4dfdc9ab135443c339ef2e8001a Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-09T08:26:37Z FLINK-6519 Integrate BlobStore in lifecycle management of HighAvailabilityServices The HighAvailabilityService creates a single BlobStoreService instance which is shared by all BlobServer and BlobCache instances. The BlobStoreService's lifecycle is exclusively managed by the HighAvailabilityServices. This means that the BlobStore's content is only cleaned up if the HighAvailabilityService's HA data is cleaned up. Having this single point of control, makes it easier to decide when to discard HA data (e.g. in case of a successful job execution) and when to retain the data (e.g. for recovery). Close and cleanup all data of BlobStore in HighAvailabilityServices Use HighAvailabilityServices to create BlobStore Introduce BlobStoreService interface to hide close and closeAndCleanupAllData methods commit c6ff2ced58e60f63d0236e53c83192f64479c44a Author: Till Rohrmann <trohrmann@apache.org> Date: 2017-05-10T15:38:49Z FLINK-6020 Introduce BlobServer#readWriteLock to synchronize file creation and deletion This commit introduces a BlobServer#readWriteLock in order to synchronize file creation and deletion operations in BlobServerConnection and BlobServer. This will prevent that multiple put and get operations interfere with each other and with get operations. The get operations are synchronized using the read lock in order to guarantee some kind of parallelism.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3525

          @WangTaoTheTonic I've opened a PR which addresses the problem: #3888. If you like, then you can try it out and see if it solves your problem.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3525 @WangTaoTheTonic I've opened a PR which addresses the problem: #3888. If you like, then you can try it out and see if it solves your problem.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3888#discussion_r116446914

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java —
          @@ -235,8 +235,54 @@ else if (contentAddressable == CONTENT_ADDRESSABLE)

          { return; }
          • // from here on, we started sending data, so all we can do is close the connection when something happens
            + readLock.lock();
            +
            try {
            + try {
            + if (!blobFile.exists()) {
            + // first we have to release the read lock in order to acquire the write lock
            + readLock.unlock();
            + writeLock.lock();
              • End diff –

          In between upgrading from read lock to write lock, multiple threads can reach this point and as far as I see, then a file can be written more often then required. I would assume the code still produces correct result, but could do duplicate work. An obvious fix would be to re-check `blobFile.exists()` under the write lock, but now sure if the costs of another meta data query per write could offset occasional, but unlikely duplicate work.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3888#discussion_r116446914 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java — @@ -235,8 +235,54 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) { return; } // from here on, we started sending data, so all we can do is close the connection when something happens + readLock.lock(); + try { + try { + if (!blobFile.exists()) { + // first we have to release the read lock in order to acquire the write lock + readLock.unlock(); + writeLock.lock(); End diff – In between upgrading from read lock to write lock, multiple threads can reach this point and as far as I see, then a file can be written more often then required. I would assume the code still produces correct result, but could do duplicate work. An obvious fix would be to re-check `blobFile.exists()` under the write lock, but now sure if the costs of another meta data query per write could offset occasional, but unlikely duplicate work.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3888#discussion_r116457501

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java —
          @@ -235,8 +235,54 @@ else if (contentAddressable == CONTENT_ADDRESSABLE)

          { return; }
          • // from here on, we started sending data, so all we can do is close the connection when something happens
            + readLock.lock();
            +
            try {
            + try {
            + if (!blobFile.exists()) {
            + // first we have to release the read lock in order to acquire the write lock
            + readLock.unlock();
            + writeLock.lock();
              • End diff –

          True, I'm also not sure which version would be more efficient. Given that the file transfer dominates the execution time here, I assume that the additional check won't hurt.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3888#discussion_r116457501 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java — @@ -235,8 +235,54 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) { return; } // from here on, we started sending data, so all we can do is close the connection when something happens + readLock.lock(); + try { + try { + if (!blobFile.exists()) { + // first we have to release the read lock in order to acquire the write lock + readLock.unlock(); + writeLock.lock(); End diff – True, I'm also not sure which version would be more efficient. Given that the file transfer dominates the execution time here, I assume that the additional check won't hurt.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3888

          Thanks for the quick review @StefanRRichter. I will address your comment and also add a test case for the delete path.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3888 Thanks for the quick review @StefanRRichter. I will address your comment and also add a test case for the delete path.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3525

          Thanks for your fix, i'll check in a day or two.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 Thanks for your fix, i'll check in a day or two.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3888

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3888
          Hide
          till.rohrmann Till Rohrmann added a comment -

          1.4.0 7ad489d87281b74c53d3b1a0dd97e56b7a8ef303
          1.3.0 60873b0c57be7b83d55af179b4be4defc46d80de

          Show
          till.rohrmann Till Rohrmann added a comment - 1.4.0 7ad489d87281b74c53d3b1a0dd97e56b7a8ef303 1.3.0 60873b0c57be7b83d55af179b4be4defc46d80de
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic commented on the issue:

          https://github.com/apache/flink/pull/3525

          @tillrohrmann I've tried with your commit and the issue is resolved, thanks. Closing this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic commented on the issue: https://github.com/apache/flink/pull/3525 @tillrohrmann I've tried with your commit and the issue is resolved, thanks. Closing this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user WangTaoTheTonic closed the pull request at:

          https://github.com/apache/flink/pull/3525

          Show
          githubbot ASF GitHub Bot added a comment - Github user WangTaoTheTonic closed the pull request at: https://github.com/apache/flink/pull/3525
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on the issue:

          https://github.com/apache/flink/pull/3525

          Great to hear @WangTaoTheTonic. Thanks a lot for testing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3525 Great to hear @WangTaoTheTonic. Thanks a lot for testing.

            People

            • Assignee:
              till.rohrmann Till Rohrmann
              Reporter:
              WangTao Tao Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development