Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5800

Make sure that the CheckpointStreamFactory is instantiated once per operator only

    Details

      Description

      Previously, the CheckpointSteamFactory was created once per checkpoint, and its repeated initialization logic (like ensuring existence of the parent directory) caused unnecessary load on some file systems at very large scale.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/3312

          FLINK-5800 [checkpointing] Create CheckpointSteamFactory only once per operator

          Previously, the factory was created once per checkpoint, and its repeated initialization logic
          (like ensuring existence of base paths) caused heavy load on some filesystems at very large scale.

          This issue was reported by (and the solution suggested by) Steven Wu.

            1. Core Changes

          The `CheckpointStreamFactory` is now created once per operator upon initialization of the operator. The factory is also stored in the operator, alongside the `KeyedStateBackend` and the `OperatorStateBackend`.

            1. Tests

          This change does not introduce new functionality, but simply re-arranges current tests to handle and check the changes instantiation of the CheckpointStreamFactory.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink stream_factory

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3312.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3312



          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3312 FLINK-5800 [checkpointing] Create CheckpointSteamFactory only once per operator Previously, the factory was created once per checkpoint, and its repeated initialization logic (like ensuring existence of base paths) caused heavy load on some filesystems at very large scale. This issue was reported by (and the solution suggested by) Steven Wu. Core Changes The `CheckpointStreamFactory` is now created once per operator upon initialization of the operator. The factory is also stored in the operator, alongside the `KeyedStateBackend` and the `OperatorStateBackend`. Tests This change does not introduce new functionality, but simply re-arranges current tests to handle and check the changes instantiation of the CheckpointStreamFactory. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink stream_factory Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3312.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3312
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user uce commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3312#discussion_r101271401

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java —
          @@ -513,7 +509,9 @@ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
          /**

          • Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)}

            if

          • the operator implements this interface.
          • */ @Deprecated
            + */
            + @Deprecated
            +
              • End diff –

          empty line

          Show
          githubbot ASF GitHub Bot added a comment - Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3312#discussion_r101271401 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java — @@ -513,7 +509,9 @@ public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { /** Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)} if the operator implements this interface. */ @Deprecated + */ + @Deprecated + End diff – empty line
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3312

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3312
          Hide
          StephanEwen Stephan Ewen added a comment -

          Fixed in

          • 1.2.1 via d3435b05f9ca94b4b9bd32df22b49f3829287075
          • 1.3.0 via 04e6758abbadf39a12848a925e6e087e060bbe3a
          Show
          StephanEwen Stephan Ewen added a comment - Fixed in 1.2.1 via d3435b05f9ca94b4b9bd32df22b49f3829287075 1.3.0 via 04e6758abbadf39a12848a925e6e087e060bbe3a

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development