Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6048

Asynchronous snapshots for heap-based operator state backends

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: 1.3.0
    • Fix Version/s: 1.3.0
    • Labels:
      None

      Description

      The synchronous checkpointing mechanism of heap-based operator state backends blocks element processing for the duration of the checkpoint.
      We could implement an heap-based operator state backend that allows for asynchronous checkpoints.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StefanRRichter opened a pull request:

          https://github.com/apache/flink/pull/3536

          FLINK-6048 Asynchronous snapshots for heap-based OperatorStateBackend

          This PR introduces asynchronous snapshots for the heap-based `DefaultOperatorStateBackend`. Compared to the asynchronous snapshots for the heap-based keyed state backend, this implementation is rather simple and eagerly generates a deep in-memory copy of the state before running the asynchronous part of the snapshot that writes to the filesystem.

          Note that this PR should later sit on top of PR #3483 and piggyback on the async-flag that was introduced.

          Furthermore, we could have a followup that actually parallelizes checkpointing the different async backends in `AsyncCheckpointRunnable`. Previously, this was not needed because there have only been keyed state backends o those have been the only async backends.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StefanRRichter/flink async-opstatebackend

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3536.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3536


          commit ff0930066fd6f9a5d54c548eff73fc4f34141b6a
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-14T13:25:57Z

          FLINK-6048 Implement async snapshots for DefaultOperatorStateBackend

          commit f38fdf7524039f2d87a4594d275959d874c4a198
          Author: Stefan Richter <s.richter@data-artisans.com>
          Date: 2017-03-14T15:07:06Z

          Unit tests for FLINK-6048


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/3536 FLINK-6048 Asynchronous snapshots for heap-based OperatorStateBackend This PR introduces asynchronous snapshots for the heap-based `DefaultOperatorStateBackend`. Compared to the asynchronous snapshots for the heap-based keyed state backend, this implementation is rather simple and eagerly generates a deep in-memory copy of the state before running the asynchronous part of the snapshot that writes to the filesystem. Note that this PR should later sit on top of PR #3483 and piggyback on the async-flag that was introduced. Furthermore, we could have a followup that actually parallelizes checkpointing the different async backends in `AsyncCheckpointRunnable`. Previously, this was not needed because there have only been keyed state backends o those have been the only async backends. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink async-opstatebackend Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3536.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3536 commit ff0930066fd6f9a5d54c548eff73fc4f34141b6a Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-14T13:25:57Z FLINK-6048 Implement async snapshots for DefaultOperatorStateBackend commit f38fdf7524039f2d87a4594d275959d874c4a198 Author: Stefan Richter <s.richter@data-artisans.com> Date: 2017-03-14T15:07:06Z Unit tests for FLINK-6048
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3536

          CC @aljoscha @StephanEwen

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3536 CC @aljoscha @StephanEwen
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3536

          Rebased.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3536 Rebased.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3536#discussion_r113465977

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java —
          @@ -131,59 +169,134 @@ public void dispose() {

          @Override
          public RunnableFuture<OperatorStateHandle> snapshot(

          • long checkpointId,
          • long timestamp,
          • CheckpointStreamFactory streamFactory,
          • CheckpointOptions checkpointOptions) throws Exception {
            + final long checkpointId,
            + final long timestamp,
            + final CheckpointStreamFactory streamFactory,
            + final CheckpointOptions checkpointOptions) throws Exception {
            +
            + final long syncStartTime = System.currentTimeMillis();

          if (registeredStates.isEmpty())

          { return DoneFuture.nullValue(); }
          • List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
          • new ArrayList<>(registeredStates.size());
            -
          • for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
          • PartitionableListState<?> state = entry.getValue();
          • OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
          • new OperatorBackendSerializationProxy.StateMetaInfo<>(
          • state.getName(),
          • state.getPartitionStateSerializer(),
          • state.getAssignmentMode());
          • metaInfoList.add(metaInfo);
            + final Map<String, PartitionableListState<?>> registeredStatesDeepCopies =
            + new HashMap<>(registeredStates.size());
            +
            + // eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing
            + for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet())
            Unknown macro: { + + PartitionableListState<?> listState = entry.getValue(); + if (null != listState) { + listState = listState.deepCopy(); + } + registeredStatesDeepCopies.put(entry.getKey(), listState); }
          • Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<>(registeredStates.size());
            + // implementation of the async IO operation, based on FutureTask
            + final AbstractAsyncIOCallable<OperatorStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
            + new AbstractAsyncIOCallable<OperatorStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
            +
            + AtomicBoolean open = new AtomicBoolean(false);
            +
            + @Override
            + public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
            + if (open.compareAndSet(false, true))
            Unknown macro: { + CheckpointStreamFactory.CheckpointStateOutputStream stream = + streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + try { + closeStreamOnCancelRegistry.registerClosable(stream); + return stream; + } catch (Exception ex) { + open.set(false); + throw ex; + } + }

            else

            { + throw new IOException("Operation already opened."); + }

            + }

          • CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory.
          • createCheckpointStateOutputStream(checkpointId, timestamp);
            + @Override
            + public OperatorStateHandle performOperation() throws Exception {
            + long asyncStartTime = System.currentTimeMillis();
          • try {
          • closeStreamOnCancelRegistry.registerClosable(out);
            + final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
            + new HashMap<>(registeredStatesDeepCopies.size());
          • DataOutputView dov = new DataOutputViewStreamWrapper(out);
            + List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
            + new ArrayList<>(registeredStatesDeepCopies.size());
          • OperatorBackendSerializationProxy backendSerializationProxy =
          • new OperatorBackendSerializationProxy(metaInfoList);
            + for (Map.Entry<String, PartitionableListState<?>> entry :
            + registeredStatesDeepCopies.entrySet()) { - backendSerializationProxy.write(dov); + PartitionableListState<?> state = entry.getValue(); + OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo = + new OperatorBackendSerializationProxy.StateMetaInfo<>( + state.getName(), + state.getPartitionStateSerializer(), + state.getAssignmentMode()); + metaInfoList.add(metaInfo); + }
          • dov.writeInt(registeredStates.size());
          • for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) { + CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle(); + DataOutputView dov = new DataOutputViewStreamWrapper(out); - PartitionableListState<?> value = entry.getValue(); - long[] partitionOffsets = value.write(out); - OperatorStateHandle.Mode mode = value.getAssignmentMode(); - writtenStatesMetaData.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); - }

            + OperatorBackendSerializationProxy backendSerializationProxy =
            + new OperatorBackendSerializationProxy(metaInfoList);

          • OperatorStateHandle handle = new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
            + backendSerializationProxy.write(dov);
          • return new DoneFuture<>(handle);
          • } finally {
          • closeStreamOnCancelRegistry.unregisterClosable(out);
          • out.close();
            + dov.writeInt(registeredStatesDeepCopies.size());
            +
            + for (Map.Entry<String, PartitionableListState<?>> entry :
            + registeredStatesDeepCopies.entrySet()) { + + PartitionableListState<?> value = entry.getValue(); + long[] partitionOffsets = value.write(out); + OperatorStateHandle.Mode mode = value.getAssignmentMode(); + writtenStatesMetaData.put( + entry.getKey(), + new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); + }

            +
            + if (open.compareAndSet(true, false)) {
            +
            + OperatorStateHandle operatorStateHandle =
            + new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
            +
            + if (asynchronousSnapshots) {
            + LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.",
            + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
            + }
            +
            + return operatorStateHandle;
            + } else

            { + throw new IOException("Checkpoint stream already closed."); + }

            + }
            +
            + @Override
            + public void done(boolean canceled) {
            + if (open.compareAndSet(true, false)) {
            + CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
            + if (null != stream) {
            + closeStreamOnCancelRegistry.unregisterClosable(stream);

              • End diff –

          Shouldn't we always try to remove the stream from the `ClosableRegistry` after the callable has been executed?

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3536#discussion_r113465977 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java — @@ -131,59 +169,134 @@ public void dispose() { @Override public RunnableFuture<OperatorStateHandle> snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory, + final CheckpointOptions checkpointOptions) throws Exception { + + final long syncStartTime = System.currentTimeMillis(); if (registeredStates.isEmpty()) { return DoneFuture.nullValue(); } List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList = new ArrayList<>(registeredStates.size()); - for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) { PartitionableListState<?> state = entry.getValue(); OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo = new OperatorBackendSerializationProxy.StateMetaInfo<>( state.getName(), state.getPartitionStateSerializer(), state.getAssignmentMode()); metaInfoList.add(metaInfo); + final Map<String, PartitionableListState<?>> registeredStatesDeepCopies = + new HashMap<>(registeredStates.size()); + + // eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing + for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) Unknown macro: { + + PartitionableListState<?> listState = entry.getValue(); + if (null != listState) { + listState = listState.deepCopy(); + } + registeredStatesDeepCopies.put(entry.getKey(), listState); } Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<>(registeredStates.size()); + // implementation of the async IO operation, based on FutureTask + final AbstractAsyncIOCallable<OperatorStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable = + new AbstractAsyncIOCallable<OperatorStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() { + + AtomicBoolean open = new AtomicBoolean(false); + + @Override + public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { + if (open.compareAndSet(false, true)) Unknown macro: { + CheckpointStreamFactory.CheckpointStateOutputStream stream = + streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + try { + closeStreamOnCancelRegistry.registerClosable(stream); + return stream; + } catch (Exception ex) { + open.set(false); + throw ex; + } + } else { + throw new IOException("Operation already opened."); + } + } CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory. createCheckpointStateOutputStream(checkpointId, timestamp); + @Override + public OperatorStateHandle performOperation() throws Exception { + long asyncStartTime = System.currentTimeMillis(); try { closeStreamOnCancelRegistry.registerClosable(out); + final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = + new HashMap<>(registeredStatesDeepCopies.size()); DataOutputView dov = new DataOutputViewStreamWrapper(out); + List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList = + new ArrayList<>(registeredStatesDeepCopies.size()); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(metaInfoList); + for (Map.Entry<String, PartitionableListState<?>> entry : + registeredStatesDeepCopies.entrySet()) { - backendSerializationProxy.write(dov); + PartitionableListState<?> state = entry.getValue(); + OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo = + new OperatorBackendSerializationProxy.StateMetaInfo<>( + state.getName(), + state.getPartitionStateSerializer(), + state.getAssignmentMode()); + metaInfoList.add(metaInfo); + } dov.writeInt(registeredStates.size()); for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) { + CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle(); + DataOutputView dov = new DataOutputViewStreamWrapper(out); - PartitionableListState<?> value = entry.getValue(); - long[] partitionOffsets = value.write(out); - OperatorStateHandle.Mode mode = value.getAssignmentMode(); - writtenStatesMetaData.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); - } + OperatorBackendSerializationProxy backendSerializationProxy = + new OperatorBackendSerializationProxy(metaInfoList); OperatorStateHandle handle = new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle()); + backendSerializationProxy.write(dov); return new DoneFuture<>(handle); } finally { closeStreamOnCancelRegistry.unregisterClosable(out); out.close(); + dov.writeInt(registeredStatesDeepCopies.size()); + + for (Map.Entry<String, PartitionableListState<?>> entry : + registeredStatesDeepCopies.entrySet()) { + + PartitionableListState<?> value = entry.getValue(); + long[] partitionOffsets = value.write(out); + OperatorStateHandle.Mode mode = value.getAssignmentMode(); + writtenStatesMetaData.put( + entry.getKey(), + new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); + } + + if (open.compareAndSet(true, false)) { + + OperatorStateHandle operatorStateHandle = + new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle()); + + if (asynchronousSnapshots) { + LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); + } + + return operatorStateHandle; + } else { + throw new IOException("Checkpoint stream already closed."); + } + } + + @Override + public void done(boolean canceled) { + if (open.compareAndSet(true, false)) { + CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); + if (null != stream) { + closeStreamOnCancelRegistry.unregisterClosable(stream); End diff – Shouldn't we always try to remove the stream from the `ClosableRegistry` after the callable has been executed?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3536#discussion_r113479735

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java —
          @@ -131,59 +169,134 @@ public void dispose() {

          @Override
          public RunnableFuture<OperatorStateHandle> snapshot(

          • long checkpointId,
          • long timestamp,
          • CheckpointStreamFactory streamFactory,
          • CheckpointOptions checkpointOptions) throws Exception {
            + final long checkpointId,
            + final long timestamp,
            + final CheckpointStreamFactory streamFactory,
            + final CheckpointOptions checkpointOptions) throws Exception {
            +
            + final long syncStartTime = System.currentTimeMillis();

          if (registeredStates.isEmpty())

          { return DoneFuture.nullValue(); }
          • List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
          • new ArrayList<>(registeredStates.size());
            -
          • for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
          • PartitionableListState<?> state = entry.getValue();
          • OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo =
          • new OperatorBackendSerializationProxy.StateMetaInfo<>(
          • state.getName(),
          • state.getPartitionStateSerializer(),
          • state.getAssignmentMode());
          • metaInfoList.add(metaInfo);
            + final Map<String, PartitionableListState<?>> registeredStatesDeepCopies =
            + new HashMap<>(registeredStates.size());
            +
            + // eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing
            + for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet())
            Unknown macro: { + + PartitionableListState<?> listState = entry.getValue(); + if (null != listState) { + listState = listState.deepCopy(); + } + registeredStatesDeepCopies.put(entry.getKey(), listState); }
          • Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<>(registeredStates.size());
            + // implementation of the async IO operation, based on FutureTask
            + final AbstractAsyncIOCallable<OperatorStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
            + new AbstractAsyncIOCallable<OperatorStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
            +
            + AtomicBoolean open = new AtomicBoolean(false);
            +
            + @Override
            + public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
            + if (open.compareAndSet(false, true))
            Unknown macro: { + CheckpointStreamFactory.CheckpointStateOutputStream stream = + streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + try { + closeStreamOnCancelRegistry.registerClosable(stream); + return stream; + } catch (Exception ex) { + open.set(false); + throw ex; + } + }

            else

            { + throw new IOException("Operation already opened."); + }

            + }

          • CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory.
          • createCheckpointStateOutputStream(checkpointId, timestamp);
            + @Override
            + public OperatorStateHandle performOperation() throws Exception {
            + long asyncStartTime = System.currentTimeMillis();
          • try {
          • closeStreamOnCancelRegistry.registerClosable(out);
            + final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
            + new HashMap<>(registeredStatesDeepCopies.size());
          • DataOutputView dov = new DataOutputViewStreamWrapper(out);
            + List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList =
            + new ArrayList<>(registeredStatesDeepCopies.size());
          • OperatorBackendSerializationProxy backendSerializationProxy =
          • new OperatorBackendSerializationProxy(metaInfoList);
            + for (Map.Entry<String, PartitionableListState<?>> entry :
            + registeredStatesDeepCopies.entrySet()) { - backendSerializationProxy.write(dov); + PartitionableListState<?> state = entry.getValue(); + OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo = + new OperatorBackendSerializationProxy.StateMetaInfo<>( + state.getName(), + state.getPartitionStateSerializer(), + state.getAssignmentMode()); + metaInfoList.add(metaInfo); + }
          • dov.writeInt(registeredStates.size());
          • for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) { + CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle(); + DataOutputView dov = new DataOutputViewStreamWrapper(out); - PartitionableListState<?> value = entry.getValue(); - long[] partitionOffsets = value.write(out); - OperatorStateHandle.Mode mode = value.getAssignmentMode(); - writtenStatesMetaData.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); - }

            + OperatorBackendSerializationProxy backendSerializationProxy =
            + new OperatorBackendSerializationProxy(metaInfoList);

          • OperatorStateHandle handle = new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
            + backendSerializationProxy.write(dov);
          • return new DoneFuture<>(handle);
          • } finally {
          • closeStreamOnCancelRegistry.unregisterClosable(out);
          • out.close();
            + dov.writeInt(registeredStatesDeepCopies.size());
            +
            + for (Map.Entry<String, PartitionableListState<?>> entry :
            + registeredStatesDeepCopies.entrySet()) { + + PartitionableListState<?> value = entry.getValue(); + long[] partitionOffsets = value.write(out); + OperatorStateHandle.Mode mode = value.getAssignmentMode(); + writtenStatesMetaData.put( + entry.getKey(), + new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); + }

            +
            + if (open.compareAndSet(true, false)) {
            +
            + OperatorStateHandle operatorStateHandle =
            + new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle());
            +
            + if (asynchronousSnapshots) {
            + LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.",
            + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime));
            + }
            +
            + return operatorStateHandle;
            + } else

            { + throw new IOException("Checkpoint stream already closed."); + }

            + }
            +
            + @Override
            + public void done(boolean canceled) {
            + if (open.compareAndSet(true, false)) {
            + CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
            + if (null != stream) {
            + closeStreamOnCancelRegistry.unregisterClosable(stream);

              • End diff –

          I don't understand exactly what you mean. In line 283, the stream is unregistered. The stream is only created if the runnable is executed in which case it is guaranteed to reach `done()`. So I think whenever a stream was created and registered, it is also unregistered and closed. Or do I miss something here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3536#discussion_r113479735 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java — @@ -131,59 +169,134 @@ public void dispose() { @Override public RunnableFuture<OperatorStateHandle> snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception { + final long checkpointId, + final long timestamp, + final CheckpointStreamFactory streamFactory, + final CheckpointOptions checkpointOptions) throws Exception { + + final long syncStartTime = System.currentTimeMillis(); if (registeredStates.isEmpty()) { return DoneFuture.nullValue(); } List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList = new ArrayList<>(registeredStates.size()); - for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) { PartitionableListState<?> state = entry.getValue(); OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo = new OperatorBackendSerializationProxy.StateMetaInfo<>( state.getName(), state.getPartitionStateSerializer(), state.getAssignmentMode()); metaInfoList.add(metaInfo); + final Map<String, PartitionableListState<?>> registeredStatesDeepCopies = + new HashMap<>(registeredStates.size()); + + // eagerly create deep copies of the list states in the sync phase, so that we can use them in the async writing + for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) Unknown macro: { + + PartitionableListState<?> listState = entry.getValue(); + if (null != listState) { + listState = listState.deepCopy(); + } + registeredStatesDeepCopies.put(entry.getKey(), listState); } Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<>(registeredStates.size()); + // implementation of the async IO operation, based on FutureTask + final AbstractAsyncIOCallable<OperatorStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable = + new AbstractAsyncIOCallable<OperatorStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() { + + AtomicBoolean open = new AtomicBoolean(false); + + @Override + public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception { + if (open.compareAndSet(false, true)) Unknown macro: { + CheckpointStreamFactory.CheckpointStateOutputStream stream = + streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp); + try { + closeStreamOnCancelRegistry.registerClosable(stream); + return stream; + } catch (Exception ex) { + open.set(false); + throw ex; + } + } else { + throw new IOException("Operation already opened."); + } + } CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory. createCheckpointStateOutputStream(checkpointId, timestamp); + @Override + public OperatorStateHandle performOperation() throws Exception { + long asyncStartTime = System.currentTimeMillis(); try { closeStreamOnCancelRegistry.registerClosable(out); + final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = + new HashMap<>(registeredStatesDeepCopies.size()); DataOutputView dov = new DataOutputViewStreamWrapper(out); + List<OperatorBackendSerializationProxy.StateMetaInfo<?>> metaInfoList = + new ArrayList<>(registeredStatesDeepCopies.size()); OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(metaInfoList); + for (Map.Entry<String, PartitionableListState<?>> entry : + registeredStatesDeepCopies.entrySet()) { - backendSerializationProxy.write(dov); + PartitionableListState<?> state = entry.getValue(); + OperatorBackendSerializationProxy.StateMetaInfo<?> metaInfo = + new OperatorBackendSerializationProxy.StateMetaInfo<>( + state.getName(), + state.getPartitionStateSerializer(), + state.getAssignmentMode()); + metaInfoList.add(metaInfo); + } dov.writeInt(registeredStates.size()); for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) { + CheckpointStreamFactory.CheckpointStateOutputStream out = getIoHandle(); + DataOutputView dov = new DataOutputViewStreamWrapper(out); - PartitionableListState<?> value = entry.getValue(); - long[] partitionOffsets = value.write(out); - OperatorStateHandle.Mode mode = value.getAssignmentMode(); - writtenStatesMetaData.put(entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); - } + OperatorBackendSerializationProxy backendSerializationProxy = + new OperatorBackendSerializationProxy(metaInfoList); OperatorStateHandle handle = new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle()); + backendSerializationProxy.write(dov); return new DoneFuture<>(handle); } finally { closeStreamOnCancelRegistry.unregisterClosable(out); out.close(); + dov.writeInt(registeredStatesDeepCopies.size()); + + for (Map.Entry<String, PartitionableListState<?>> entry : + registeredStatesDeepCopies.entrySet()) { + + PartitionableListState<?> value = entry.getValue(); + long[] partitionOffsets = value.write(out); + OperatorStateHandle.Mode mode = value.getAssignmentMode(); + writtenStatesMetaData.put( + entry.getKey(), + new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode)); + } + + if (open.compareAndSet(true, false)) { + + OperatorStateHandle operatorStateHandle = + new OperatorStateHandle(writtenStatesMetaData, out.closeAndGetHandle()); + + if (asynchronousSnapshots) { + LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.", + streamFactory, Thread.currentThread(), (System.currentTimeMillis() - asyncStartTime)); + } + + return operatorStateHandle; + } else { + throw new IOException("Checkpoint stream already closed."); + } + } + + @Override + public void done(boolean canceled) { + if (open.compareAndSet(true, false)) { + CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); + if (null != stream) { + closeStreamOnCancelRegistry.unregisterClosable(stream); End diff – I don't understand exactly what you mean. In line 283, the stream is unregistered. The stream is only created if the runnable is executed in which case it is guaranteed to reach `done()`. So I think whenever a stream was created and registered, it is also unregistered and closed. Or do I miss something here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter commented on the issue:

          https://github.com/apache/flink/pull/3536

          Thanks for the review @tillrohrmann. I addressed your comments concerning unregistration of streams. Will merge this now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3536 Thanks for the review @tillrohrmann. I addressed your comments concerning unregistration of streams. Will merge this now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StefanRRichter closed the pull request at:

          https://github.com/apache/flink/pull/3536

          Show
          githubbot ASF GitHub Bot added a comment - Github user StefanRRichter closed the pull request at: https://github.com/apache/flink/pull/3536

            People

            • Assignee:
              srichter Stefan Richter
              Reporter:
              srichter Stefan Richter
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development