Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-7266

Don't attempt to delete parent directory on S3

    Details

    • Type: Bug
    • Status: Open
    • Priority: Blocker
    • Resolution: Unresolved
    • Affects Version/s: 1.3.1
    • Fix Version/s: 1.4.0, 1.3.2
    • Component/s: Core
    • Labels:
      None

      Description

      Currently, every attempted release of an S3 state object also checks if the "parent directory" is empty and then tries to delete it.

      Not only is that unnecessary on S3, but it is prohibitively expensive and for example causes S3 to throttle calls by the JobManager on checkpoint cleanup.

      The FileState must only attempt parent directory cleanup when operating against real file systems, not when operating against object stores.

        Issue Links

          Activity

          Hide
          aljoscha Aljoscha Krettek added a comment -

          For 1.4, this will be resolved by only deleting the parent directory on the master (in the checkpoint coordinator).

          Show
          aljoscha Aljoscha Krettek added a comment - For 1.4, this will be resolved by only deleting the parent directory on the master (in the checkpoint coordinator).
          Hide
          StephanEwen Stephan Ewen added a comment -

          True, this is a problem in 1.3.2 - the tradeoff was to either have a very large amount of redundant requests for directory emptiness check (which cause the checkpointing to stall or be throttled) or to leave the "directories".

          In Flink 1.4 we want to fix this by letting the checkpoints understand the file structure and make it a single call to drop the directory, as Steve suggested.
          The current abstraction is overly generic (just things in arbitrary byte chunks) and does not understand that checkpoint files cluster together in directories.

          Show
          StephanEwen Stephan Ewen added a comment - True, this is a problem in 1.3.2 - the tradeoff was to either have a very large amount of redundant requests for directory emptiness check (which cause the checkpointing to stall or be throttled) or to leave the "directories". In Flink 1.4 we want to fix this by letting the checkpoints understand the file structure and make it a single call to drop the directory, as Steve suggested. The current abstraction is overly generic (just things in arbitrary byte chunks) and does not understand that checkpoint files cluster together in directories.
          Hide
          elevy Elias Levy added a comment -

          I am curious what the state of this is. It is still a problem on 1.3.2, making use of S3 with the file system state backend very imprudent in production. You end up with thousands of empty "directories" in S3 for the checkpoints

          $ $ sudo aws s3 ls --recursive s3://bucket/flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/ 
          2017-09-15 23:03:15          0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-1/
          2017-09-15 23:04:15          0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-10/
          2017-09-15 23:14:07          0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-100/
          2017-09-15 23:14:14          0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-101/
          2017-09-15 23:14:20          0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-102/
          2017-09-15 23:15:12          0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-103/
          2017-09-15 23:15:18          0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-104/
          ...
          
          Show
          elevy Elias Levy added a comment - I am curious what the state of this is. It is still a problem on 1.3.2, making use of S3 with the file system state backend very imprudent in production. You end up with thousands of empty "directories" in S3 for the checkpoints $ $ sudo aws s3 ls --recursive s3: //bucket/flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/ 2017-09-15 23:03:15 0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-1/ 2017-09-15 23:04:15 0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-10/ 2017-09-15 23:14:07 0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-100/ 2017-09-15 23:14:14 0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-101/ 2017-09-15 23:14:20 0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-102/ 2017-09-15 23:15:12 0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-103/ 2017-09-15 23:15:18 0 flink/checkpoints/58c7604fbc543b6df75b62601a9b4c9d/chk-104/ ...
          Hide
          stevel@apache.org Steve Loughran added a comment -

          if you are using s3a then delete(path, recursive=false) will stop you from trying to delete a non-empty dir

          Show
          stevel@apache.org Steve Loughran added a comment - if you are using s3a then delete(path, recursive=false) will stop you from trying to delete a non-empty dir
          Hide
          aljoscha Aljoscha Krettek added a comment -

          I think that's only part of the problem because Flink must check on its own whether the directory is empty before we can delete it.

          The basic problem is that each state handle is being cleaned up individually. If we had global knowledge that all state handles actually reside in on base directory then we could shoot of an asynchronous command that deletes that whole sub-directory. (Which might still be horribly slow on S3 and not solve the problem at all.)

          Show
          aljoscha Aljoscha Krettek added a comment - I think that's only part of the problem because Flink must check on its own whether the directory is empty before we can delete it. The basic problem is that each state handle is being cleaned up individually. If we had global knowledge that all state handles actually reside in on base directory then we could shoot of an asynchronous command that deletes that whole sub-directory. (Which might still be horribly slow on S3 and not solve the problem at all.)
          Hide
          stevel@apache.org Steve Loughran added a comment -

          FWIW, in s3a we create a single delete request to rm all parent paths and don't bother doing the existence check.

          That is, for a file a/b/c.txt, after the file is written in close(), POST a delete list of

          /a/
          /a/b

          It's ~O(1) for depth and as you don't need to wait for the response, even something you could being async on.

          Show
          stevel@apache.org Steve Loughran added a comment - FWIW, in s3a we create a single delete request to rm all parent paths and don't bother doing the existence check . That is, for a file a/b/c.txt, after the file is written in close(), POST a delete list of /a/ /a/b It's ~O(1) for depth and as you don't need to wait for the response, even something you could being async on.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          This is actually resolved on release-1.3 for the s3 filesystem.

          Show
          aljoscha Aljoscha Krettek added a comment - This is actually resolved on release-1.3 for the s3 filesystem.
          Hide
          srichter Stefan Richter added a comment -

          I agree that we should not block the release on this. Just wanted to have this recorded with this issue, so that we can improve it for the future.

          Show
          srichter Stefan Richter added a comment - I agree that we should not block the release on this. Just wanted to have this recorded with this issue, so that we can improve it for the future.
          Hide
          StephanEwen Stephan Ewen added a comment -

          We could try and improve that by not dong the mkdirs() call in the stream factory for each state element. That might help with that, but I would consider this to not be a release blocker.

          I would try and solve that in a more holistic way in 1.4.0, by extending the FileSystem abstraction and post-state release hooks in the Checkpoint Coordinator (so that there is one call to drop the directory marker file, if we cannot find a way for it to not be created in the first place.

          Show
          StephanEwen Stephan Ewen added a comment - We could try and improve that by not dong the mkdirs() call in the stream factory for each state element. That might help with that, but I would consider this to not be a release blocker. I would try and solve that in a more holistic way in 1.4.0, by extending the FileSystem abstraction and post-state release hooks in the Checkpoint Coordinator (so that there is one call to drop the directory marker file, if we cannot find a way for it to not be created in the first place.
          Hide
          srichter Stefan Richter added a comment -

          One comment about the "not necessary on S3" part: during the release 1.3.2 testing, I observed that I can see some empty directory entries remaining in S3. I added a screenshot in the testing document here. If this is not a problem, can the issue be closed or is the merge into 1.4 still pending?

          Show
          srichter Stefan Richter added a comment - One comment about the "not necessary on S3" part: during the release 1.3.2 testing, I observed that I can see some empty directory entries remaining in S3. I added a screenshot in the testing document here . If this is not a problem, can the issue be closed or is the merge into 1.4 still pending?

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:

                Development