Details

      Description

      Currently, the ShareStateRegistry is deleting state handles that are no longer referenced under the registry's lock and from the main thread. We should use the CheckpointCoordinator's async IO executor to make this non-blocking.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shixiaogang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3870#discussion_r116172093

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java —
          @@ -18,91 +18,137 @@

          package org.apache.flink.runtime.state;

          +import org.apache.flink.runtime.concurrent.Executors;
          import org.apache.flink.util.Preconditions;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;

          import java.util.HashMap;
          import java.util.Map;
          +import java.util.Objects;
          +import java.util.concurrent.Executor;

          /**

          • A {@code SharedStateRegistry}

            will be deployed in the

          • * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator}

            to
            + *

            {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore}

            to

          • maintain the reference count of {@link SharedStateHandle}

            s which are shared

          • * among different checkpoints.
          • *
            + * among different incremental checkpoints.
            */
            public class SharedStateRegistry {

          private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);

          /** All registered state objects by an artificial key */

          • private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates;
            + private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates;
            +
            + /** Executor for async state deletion */
            + private final Executor asyncDisposalExecutor;

          public SharedStateRegistry() {
          this.registeredStates = new HashMap<>();
          + this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534
          — End diff –

          I prefer not to use another asynchronous executor here.

          In my initial implementation of `SharedStateRegistry`, unreferenced shared states are not discarded immediately and are returned in a list. These unreferenced shared states then are discarded outside the synchronization scope. Given that the completed checkpoints are already discarded in an asynchronous thread in the `ZookeeperCompletedCheckpointStore` (which are more used in practice), we can avoid the usage of another asynchronous executor here.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user shixiaogang commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116172093 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java — @@ -18,91 +18,137 @@ package org.apache.flink.runtime.state; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; /** A {@code SharedStateRegistry} will be deployed in the * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to maintain the reference count of {@link SharedStateHandle} s which are shared * among different checkpoints. * + * among different incremental checkpoints. */ public class SharedStateRegistry { private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class); /** All registered state objects by an artificial key */ private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates; + private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates; + + /** Executor for async state deletion */ + private final Executor asyncDisposalExecutor; public SharedStateRegistry() { this.registeredStates = new HashMap<>(); + this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534 — End diff – I prefer not to use another asynchronous executor here. In my initial implementation of `SharedStateRegistry`, unreferenced shared states are not discarded immediately and are returned in a list. These unreferenced shared states then are discarded outside the synchronization scope. Given that the completed checkpoints are already discarded in an asynchronous thread in the `ZookeeperCompletedCheckpointStore` (which are more used in practice), we can avoid the usage of another asynchronous executor here. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3870#discussion_r116179467

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java —
          @@ -18,91 +18,137 @@

          package org.apache.flink.runtime.state;

          +import org.apache.flink.runtime.concurrent.Executors;
          import org.apache.flink.util.Preconditions;
          import org.slf4j.Logger;
          import org.slf4j.LoggerFactory;

          import java.util.HashMap;
          import java.util.Map;
          +import java.util.Objects;
          +import java.util.concurrent.Executor;

          /**

          • A {@code SharedStateRegistry}

            will be deployed in the

          • * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator}

            to
            + *

            {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore}

            to

          • maintain the reference count of {@link SharedStateHandle}

            s which are shared

          • * among different checkpoints.
          • *
            + * among different incremental checkpoints.
            */
            public class SharedStateRegistry {

          private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);

          /** All registered state objects by an artificial key */

          • private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates;
            + private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates;
            +
            + /** Executor for async state deletion */
            + private final Executor asyncDisposalExecutor;

          public SharedStateRegistry() {
          this.registeredStates = new HashMap<>();
          + this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534
          — End diff –

          I totally agree that there should not be a new executor, that is why I marked it with the TODO. This is just a preparation for the full fix of FLINK-6534. My plan for the full fix is to pass the IO executor from the `CompletedCheckpointStore` and use it inside the registry. This will happen outside of any synchronization. For now, this code is a working placeholder for the full fix that I will do as a followup.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3870#discussion_r116179467 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java — @@ -18,91 +18,137 @@ package org.apache.flink.runtime.state; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Executor; /** A {@code SharedStateRegistry} will be deployed in the * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to maintain the reference count of {@link SharedStateHandle} s which are shared * among different checkpoints. * + * among different incremental checkpoints. */ public class SharedStateRegistry { private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class); /** All registered state objects by an artificial key */ private final Map<String, SharedStateRegistry.SharedStateEntry> registeredStates; + private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates; + + /** Executor for async state deletion */ + private final Executor asyncDisposalExecutor; public SharedStateRegistry() { this.registeredStates = new HashMap<>(); + this.asyncDisposalExecutor = Executors.directExecutor(); //TODO: FLINK-6534 — End diff – I totally agree that there should not be a new executor, that is why I marked it with the TODO. This is just a preparation for the full fix of FLINK-6534 . My plan for the full fix is to pass the IO executor from the `CompletedCheckpointStore` and use it inside the registry. This will happen outside of any synchronization. For now, this code is a working placeholder for the full fix that I will do as a followup.
          Hide
          srichter Stefan Richter added a comment -

          fixed in 44fb035e02.

          Show
          srichter Stefan Richter added a comment - fixed in 44fb035e02.

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development