Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6390

Add Trigger Hooks to the Checkpoint Coordinator

    Details

      Description

      Some source systems require to be notified prior to starting a checkpoint, in order to do preparatory work for the checkpoint.

      I propose to add an interface to allow sources to register hooks that are called by the checkpoint coordinator when triggering / restoring a checkpoint.
      These hooks may produce state that is stores with the checkpoint metadata.

      Envisioned interface for the hooks

      /**
       * The interface for hooks that can be called by the checkpoint coordinator when triggering or
       * restoring a checkpoint. Such a hook is useful for example when preparing external systems for
       * taking or restoring checkpoints.
       * 
       * <p>The {@link #triggerCheckpoint(long, long, Executor)} method (called when triggering a checkpoint)
       * can return a result (via a future) that will be stored as part of the checkpoint metadata.
       * When restoring a checkpoint, that stored result will be given to the {@link #restoreCheckpoint(long, Object)}
       * method. The hook's {@link #getIdentifier() identifier} is used to map data to hook in the presence
       * of multiple hooks, and when resuming a savepoint that was potentially created by a different job.
       * The identifier has a similar role as for example the operator UID in the streaming API.
       * 
       * <p>The MasterTriggerRestoreHook is defined when creating the streaming dataflow graph. It is attached
       * to the job graph, which gets sent to the cluster for execution. To avoid having to make the hook
       * itself serializable, these hooks are attached to the job graph via a {@link MasterTriggerRestoreHook.Factory}.
       * 
       * @param <T> The type of the data produced by the hook and stored as part of the checkpoint metadata.
       *            If the hook never stores any data, this can be typed to {@code Void}.
       */
      public interface MasterTriggerRestoreHook<T> {
      
      	/**
      	 * Gets the identifier of this hook. The identifier is used to identify a specific hook in the
      	 * presence of multiple hooks and to give it the correct checkpointed data upon checkpoint restoration.
      	 * 
      	 * <p>The identifier should be unique between different hooks of a job, but deterministic/constant
      	 * so that upon resuming a savepoint, the hook will get the correct data.
      	 * For example, if the hook calls into another storage system and persists namespace/schema specific
      	 * information, then the name of the storage system, together with the namespace/schema name could
      	 * be an appropriate identifier.
      	 * 
      	 * <p>When multiple hooks of the same name are created and attached to a job graph, only the first
      	 * one is actually used. This can be exploited to deduplicate hooks that would do the same thing.
      	 * 
      	 * @return The identifier of the hook. 
      	 */
      	String getIdentifier();
      
      	/**
      	 * This method is called by the checkpoint coordinator prior when triggering a checkpoint, prior
      	 * to sending the "trigger checkpoint" messages to the source tasks.
      	 * 
      	 * <p>If the hook implementation wants to store data as part of the checkpoint, it may return
      	 * that data via a future, otherwise it should return null. The data is stored as part of
      	 * the checkpoint metadata under the hooks identifier (see {@link #getIdentifier()}).
      	 * 
      	 * <p>If the action by this hook needs to be executed synchronously, then this method should
      	 * directly execute the action synchronously and block until it is complete. The returned future
      	 * (if any) would typically be a completed future.
      	 * 
      	 * <p>If the action should be executed asynchronously and only needs to complete before the
      	 * checkpoint is considered completed, then the method may use the given executor to execute the
      	 * actual action and would signal its completion by completing the future. For hooks that do not
      	 * need to store data, the future would be completed with null.
      	 * 
      	 * @param checkpointId The ID (logical timestamp, monotonously increasing) of the checkpoint
      	 * @param timestamp The wall clock timestamp when the checkpoint was triggered, for
      	 *                  info/logging purposes. 
      	 * @param executor The executor for asynchronous actions
      	 * 
      	 * @return Optionally, a future that signals when the hook has completed and that contains
      	 *         data to be stored with the checkpoint.
      	 * 
      	 * @throws Exception Exceptions encountered when calling the hook will cause the checkpoint to abort.
      	 */
      	@Nullable
      	Future<T> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception;
      
      	/**
      	 * This method is called by the checkpoint coordinator prior to restoring the state of a checkpoint.
      	 * If the checkpoint did store data from this hook, that data will be passed to this method. 
      	 * 
      	 * @param checkpointId The The ID (logical timestamp) of the restored checkpoint
      	 * @param checkpointData The data originally stored in the checkpoint by this hook, possibly null. 
      	 * 
      	 * @throws Exception Exceptions thrown while restoring the checkpoint will cause the restore
      	 *                   operation to fail and to possibly fall back to another checkpoint. 
      	 */
      	void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception;
      
      	/**
      	 * Creates a the serializer to (de)serializes the data stored by this hook. The serializer
      	 * serializes the result of the Future returned by the {@link #triggerCheckpoint(long, long, Executor)}
      	 * method, and deserializes the data stored in the checkpoint into the object passed to the
      	 * {@link #restoreCheckpoint(long, Object)} method. 
      	 * 
      	 * <p>If the hook never returns any data to be stored, then this method may return null as the
      	 * serializer.
      	 * 
      	 * @return The serializer to (de)serializes the data stored by this hook
      	 */
      	@Nullable
      	SimpleVersionedSerializer<T> createCheckpointDataSerializer();
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user StephanEwen opened a pull request:

          https://github.com/apache/flink/pull/3782

          FLINK-6390 [checkpoints] Add API for checkpoints that are triggered via external systems

          Some source systems require to be notified prior to starting a checkpoint, in order to do preparatory work for the checkpoint.

          This PR adds an interface to allow sources to register hooks that are called by the checkpoint coordinator when triggering / restoring a checkpoint. These hooks may produce state that is stores with the checkpoint metadata.

          Because this changes the checkpoint metadata format, the commit introduces a new metadata format version.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/StephanEwen/incubator-flink trigger_via_source

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3782.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3782


          commit 3332363d50a6a1a78435bacecf3f75cdc54f5bfa
          Author: Stephan Ewen <sewen@apache.org>
          Date: 2017-03-27T15:20:47Z

          FLINK-6390 [checkpoints] Add API for checkpoints that are triggered via external systems

          This includes

          • A interface for hooks that are called by the checkpoint coordinator to trigger/restore a checkpoint
          • A source extension that triggers the operator checkpoints and barrier injection on certain events

          Because this changes the checkpoint metadata format, the commit introduces a new metadata format version.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user StephanEwen opened a pull request: https://github.com/apache/flink/pull/3782 FLINK-6390 [checkpoints] Add API for checkpoints that are triggered via external systems Some source systems require to be notified prior to starting a checkpoint, in order to do preparatory work for the checkpoint. This PR adds an interface to allow sources to register hooks that are called by the checkpoint coordinator when triggering / restoring a checkpoint. These hooks may produce state that is stores with the checkpoint metadata. Because this changes the checkpoint metadata format, the commit introduces a new metadata format version. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StephanEwen/incubator-flink trigger_via_source Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3782.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3782 commit 3332363d50a6a1a78435bacecf3f75cdc54f5bfa Author: Stephan Ewen <sewen@apache.org> Date: 2017-03-27T15:20:47Z FLINK-6390 [checkpoints] Add API for checkpoints that are triggered via external systems This includes A interface for hooks that are called by the checkpoint coordinator to trigger/restore a checkpoint A source extension that triggers the operator checkpoints and barrier injection on certain events Because this changes the checkpoint metadata format, the commit introduces a new metadata format version.
          Hide
          gyfora Gyula Fora added a comment -

          Hi Stephan,

          This looks pretty useful. One thing that came to my mind about this whether it makes sense to add a hook when all tasks have completeted their local snapshot but before completing the full snapshot. (To implement a 2 phase committing logic for instance which could be used backends that present the data externally)

          Gyula

          Show
          gyfora Gyula Fora added a comment - Hi Stephan, This looks pretty useful. One thing that came to my mind about this whether it makes sense to add a hook when all tasks have completeted their local snapshot but before completing the full snapshot. (To implement a 2 phase committing logic for instance which could be used backends that present the data externally) Gyula
          Hide
          gyfora Gyula Fora added a comment -

          we could call it completeCheckpoint for example

          Show
          gyfora Gyula Fora added a comment - we could call it completeCheckpoint for example
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3782#discussion_r113504901

          — Diff: flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java —
          @@ -0,0 +1,80 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.core.io;
          +
          +import java.io.IOException;
          +
          +/**
          + * A simple serializer interface for versioned serialization.
          + *
          + * <p>The serializer has a version (returned by

          {@link #getVersion()}) which can be attached
          + * to the serialized data. When the serializer evolves, the version can be used to identify
          + * with which prior version the data was serialized.
          + *
          + * <pre>{@code + * MyType someObject = ...; + * SimpleVersionedSerializer<MyType> serializer = ...; + * + * byte[] serializedData = serializer.serialize(someObject); + * int version = serializer.getVersion(); + * + * MyType deserialized = serializer.deserialize(version, serializedData); + * + * byte[] someOldData = ...; + * int oldVersion = ...; + * MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData); + * + * }</pre>
          + *
          + * @param <E> The data type serialized / deserialized by this serializer.
          + */
          +public interface SimpleVersionedSerializer<E> extends Versioned {
          +
          + /**
          + * Gets the version with which this serializer serializes.
          + *
          + * @return The version of the serialization schema.
          + */
          + @Override
          + int getVersion();
          +
          + /**
          + * Serializes the given object. The serialization is assumed to correspond to the
          + * current serialization version (as returned by {@link #getVersion()}

          .
          + *
          + *
          + * @param checkpointData The object to serialize.
          + * @return The serialized data (bytes).
          + *
          + * @throws IOException Thrown, if the serialization fails.
          + */
          + byte[] serialize(E checkpointData) throws IOException;
          — End diff –

          `checkpointData` is maybe a too specific parameter name.

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3782#discussion_r113504901 — Diff: flink-core/src/main/java/org/apache/flink/core/io/SimpleVersionedSerializer.java — @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.io; + +import java.io.IOException; + +/** + * A simple serializer interface for versioned serialization. + * + * <p>The serializer has a version (returned by {@link #getVersion()}) which can be attached + * to the serialized data. When the serializer evolves, the version can be used to identify + * with which prior version the data was serialized. + * + * <pre>{@code + * MyType someObject = ...; + * SimpleVersionedSerializer<MyType> serializer = ...; + * + * byte[] serializedData = serializer.serialize(someObject); + * int version = serializer.getVersion(); + * + * MyType deserialized = serializer.deserialize(version, serializedData); + * + * byte[] someOldData = ...; + * int oldVersion = ...; + * MyType deserializedOldObject = serializer.deserialize(oldVersion, someOldData); + * + * }</pre> + * + * @param <E> The data type serialized / deserialized by this serializer. + */ +public interface SimpleVersionedSerializer<E> extends Versioned { + + /** + * Gets the version with which this serializer serializes. + * + * @return The version of the serialization schema. + */ + @Override + int getVersion(); + + /** + * Serializes the given object. The serialization is assumed to correspond to the + * current serialization version (as returned by {@link #getVersion()} . + * + * + * @param checkpointData The object to serialize. + * @return The serialized data (bytes). + * + * @throws IOException Thrown, if the serialization fails. + */ + byte[] serialize(E checkpointData) throws IOException; — End diff – `checkpointData` is maybe a too specific parameter name.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user tillrohrmann commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3782#discussion_r113507485

          — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java —
          @@ -0,0 +1,272 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.runtime.checkpoint.hooks;
          +
          +import org.apache.flink.api.common.time.Time;
          +import org.apache.flink.api.java.tuple.Tuple2;
          +import org.apache.flink.core.io.SimpleVersionedSerializer;
          +import org.apache.flink.runtime.checkpoint.MasterState;
          +import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
          +import org.apache.flink.runtime.concurrent.Future;
          +import org.apache.flink.util.ExceptionUtils;
          +import org.apache.flink.util.FlinkException;
          +import org.slf4j.Logger;
          +
          +import java.util.ArrayList;
          +import java.util.Collection;
          +import java.util.LinkedHashMap;
          +import java.util.List;
          +import java.util.Map;
          +import java.util.concurrent.ExecutionException;
          +import java.util.concurrent.Executor;
          +import java.util.concurrent.TimeoutException;
          +
          +/**
          + * Collection of methods to deal with checkpoint master hooks.
          + */
          +public class MasterHooks {
          +
          + // ------------------------------------------------------------------------
          + // checkpoint triggering
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Triggers all given master hooks and returns state objects for each hook that
          + * produced a state.
          + *
          + * @param hooks The hooks to trigger
          + * @param checkpointId The checkpoint ID of the triggering checkpoint
          + * @param timestamp The (informational) timestamp for the triggering checkpoint
          + * @param executor An executor that can be used for asynchronous I/O calls
          + * @param timeout The maximum time that a hook may take to complete
          + *
          + * @return A list containing all states produced by the hooks
          + *
          + * @throws FlinkException Thrown, if the hooks throw an exception, or the state+
          + * deserialization fails.
          + */
          + public static List<MasterState> triggerMasterHooks(
          + Collection<MasterTriggerRestoreHook<?>> hooks,
          + long checkpointId,
          + long timestamp,
          + Executor executor,
          + Time timeout) throws FlinkException {
          +
          + final ArrayList<MasterState> states = new ArrayList<>(hooks.size());
          +
          + for (MasterTriggerRestoreHook<?> hook : hooks) {
          + MasterState state = triggerHook(hook, checkpointId, timestamp, executor, timeout);
          + if (state != null)

          { + states.add(state); + }

          + }
          +
          + states.trimToSize();
          + return states;
          + }
          +
          + private static <T> MasterState triggerHook(
          + MasterTriggerRestoreHook<?> hook,
          + long checkpointId,
          + long timestamp,
          + Executor executor,
          + Time timeout) throws FlinkException {
          +
          + @SuppressWarnings("unchecked")
          + final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook;
          +
          + final String id = typedHook.getIdentifier();
          + final SimpleVersionedSerializer<T> serializer = typedHook.createCheckpointDataSerializer();
          +
          + // call the hook!
          + final Future<T> resultFuture;
          + try

          { + resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor); + }

          + catch (Throwable t)

          { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + throw new FlinkException("Error while triggering checkpoint master hook '" + id + '\'', t); + }

          +
          + // is there is a result future, wait for its completion
          + // in the future we want to make this asynchronous with futures (no pun intended)
          + if (resultFuture == null)

          { + return null; + }

          + else {
          + final T result;
          + try

          { + result = resultFuture.get(timeout.getSize(), timeout.getUnit()); + }

          + catch (InterruptedException e)

          { + // cannot continue here - restore interrupt status and leave + Thread.currentThread().interrupt(); + throw new FlinkException("Checkpoint master hook was interrupted"); + }

          + catch (ExecutionException e)

          { + throw new FlinkException("Checkpoint master hook '" + id + "' produced an exception", e.getCause()); + }

          + catch (TimeoutException e)

          { + throw new FlinkException("Checkpoint master hook '" + id + + "' did not complete in time (" + timeout + ')'); + }

          +
          + // if the result of the future is not null, return it as state
          + if (result == null)

          { + return null; + }

          + else if (serializer != null) {
          + try

          { + final int version = serializer.getVersion(); + final byte[] bytes = serializer.serialize(result); + + return new MasterState(id, bytes, version); + }

          + catch (Throwable t)

          { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + throw new FlinkException("Failed to serialize state of master hook '" + id + '\'', t); + }

          + }
          + else

          { + throw new FlinkException("Checkpoint hook '" + id + " is stateful but creates no serializer"); + }

          + }
          + }
          +
          + // ------------------------------------------------------------------------
          + // checkpoint restoring
          + // ------------------------------------------------------------------------
          +
          + /**
          + * Calls the restore method given checkpoint master hooks and passes the given master
          + * state to them where state with a matching name is found.
          + *
          + * <p>If state is found and no hook with the same name is found, the method throws an
          + * exception, unless the

          {@code allowUnmatchedState}

          flag is set.
          + *
          + * @param masterHooks The hooks to call restore on
          + * @param states The state to pass to the hooks
          + * @param checkpointId The checkpoint ID of the restored checkpoint
          + * @param allowUnmatchedState True,
          + * @param log The logger for log messages
          + *
          + * @throws FlinkException Thrown, if the hooks throw an exception, or the state+
          + * deserialization fails.
          + */
          + public static void restoreMasterHooks(
          + final Map<String, MasterTriggerRestoreHook<?>> masterHooks,
          + final List<MasterState> states,
          + final long checkpointId,
          + final boolean allowUnmatchedState,
          + final Logger log) throws FlinkException {
          +
          + // early out
          + if (states == null || states.isEmpty() || masterHooks == null || masterHooks.isEmpty())

          { + log.info("No master state to restore"); + return; + }

          +
          + log.info("Calling master restore hooks");
          +
          + // collect the hooks
          + final LinkedHashMap<String, MasterTriggerRestoreHook<?>> allHooks = new LinkedHashMap<>(masterHooks);
          +
          + // first, deserialize all hook state
          + final ArrayList<Tuple2<MasterTriggerRestoreHook<?>, Object>> hooksAndStates = new ArrayList<>();
          +
          + for (MasterState state : states) {
          + if (state != null) {
          + final String name = state.name();
          + final MasterTriggerRestoreHook<?> hook = allHooks.remove(name);
          +
          + if (hook != null) {
          + log.debug("Found state to restore for hook '{}'", name);
          +
          + Object deserializedState = deserializeState(state, hook);
          + hooksAndStates.add(new Tuple2<MasterTriggerRestoreHook<?>, Object>(hook, deserializedState));
          + }
          + else if (!allowUnmatchedState)

          { + throw new IllegalStateException("Found state '" + state.name() + + "' which is not resumed by any hook."); + }

          + else {
          + log.info("Dropping unmatched state from '{}'", name);
          + }
          + }
          + }
          +
          + // now that all is deserialized, call the hooks
          + for (Tuple2<MasterTriggerRestoreHook<?>, Object> hookAndState : hooksAndStates)

          { + restoreHook(hookAndState.f1, hookAndState.f0, checkpointId); + }

          +
          + // trigger the remaining hooks without checkpointed state
          + for (MasterTriggerRestoreHook<?> hook : allHooks.values())

          { + restoreHook(null, hook, checkpointId); + }

          + }
          +
          + private static <T> T deserializeState(MasterState state, MasterTriggerRestoreHook<?> hook) throws FlinkException {
          + @SuppressWarnings("unchecked")
          + final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook;
          + final String id = hook.getIdentifier();
          +
          + try {
          + final SimpleVersionedSerializer<T> deserializer = typedHook.createCheckpointDataSerializer();
          + if (deserializer == null)

          { + throw new FlinkException("null serializer for state of hook " + hook.getIdentifier()); + }

          +
          + return deserializer.deserialize(state.version(), state.bytes());
          + }
          + catch (Throwable t) {
          + throw new FlinkException("Cannot deserialize state for master hook '" + id + '\'');
          — End diff –

          exception is swallowed here

          Show
          githubbot ASF GitHub Bot added a comment - Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3782#discussion_r113507485 — Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java — @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.hooks; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.checkpoint.MasterState; +import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; + +/** + * Collection of methods to deal with checkpoint master hooks. + */ +public class MasterHooks { + + // ------------------------------------------------------------------------ + // checkpoint triggering + // ------------------------------------------------------------------------ + + /** + * Triggers all given master hooks and returns state objects for each hook that + * produced a state. + * + * @param hooks The hooks to trigger + * @param checkpointId The checkpoint ID of the triggering checkpoint + * @param timestamp The (informational) timestamp for the triggering checkpoint + * @param executor An executor that can be used for asynchronous I/O calls + * @param timeout The maximum time that a hook may take to complete + * + * @return A list containing all states produced by the hooks + * + * @throws FlinkException Thrown, if the hooks throw an exception, or the state+ + * deserialization fails. + */ + public static List<MasterState> triggerMasterHooks( + Collection<MasterTriggerRestoreHook<?>> hooks, + long checkpointId, + long timestamp, + Executor executor, + Time timeout) throws FlinkException { + + final ArrayList<MasterState> states = new ArrayList<>(hooks.size()); + + for (MasterTriggerRestoreHook<?> hook : hooks) { + MasterState state = triggerHook(hook, checkpointId, timestamp, executor, timeout); + if (state != null) { + states.add(state); + } + } + + states.trimToSize(); + return states; + } + + private static <T> MasterState triggerHook( + MasterTriggerRestoreHook<?> hook, + long checkpointId, + long timestamp, + Executor executor, + Time timeout) throws FlinkException { + + @SuppressWarnings("unchecked") + final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook; + + final String id = typedHook.getIdentifier(); + final SimpleVersionedSerializer<T> serializer = typedHook.createCheckpointDataSerializer(); + + // call the hook! + final Future<T> resultFuture; + try { + resultFuture = typedHook.triggerCheckpoint(checkpointId, timestamp, executor); + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + throw new FlinkException("Error while triggering checkpoint master hook '" + id + '\'', t); + } + + // is there is a result future, wait for its completion + // in the future we want to make this asynchronous with futures (no pun intended) + if (resultFuture == null) { + return null; + } + else { + final T result; + try { + result = resultFuture.get(timeout.getSize(), timeout.getUnit()); + } + catch (InterruptedException e) { + // cannot continue here - restore interrupt status and leave + Thread.currentThread().interrupt(); + throw new FlinkException("Checkpoint master hook was interrupted"); + } + catch (ExecutionException e) { + throw new FlinkException("Checkpoint master hook '" + id + "' produced an exception", e.getCause()); + } + catch (TimeoutException e) { + throw new FlinkException("Checkpoint master hook '" + id + + "' did not complete in time (" + timeout + ')'); + } + + // if the result of the future is not null, return it as state + if (result == null) { + return null; + } + else if (serializer != null) { + try { + final int version = serializer.getVersion(); + final byte[] bytes = serializer.serialize(result); + + return new MasterState(id, bytes, version); + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + throw new FlinkException("Failed to serialize state of master hook '" + id + '\'', t); + } + } + else { + throw new FlinkException("Checkpoint hook '" + id + " is stateful but creates no serializer"); + } + } + } + + // ------------------------------------------------------------------------ + // checkpoint restoring + // ------------------------------------------------------------------------ + + /** + * Calls the restore method given checkpoint master hooks and passes the given master + * state to them where state with a matching name is found. + * + * <p>If state is found and no hook with the same name is found, the method throws an + * exception, unless the {@code allowUnmatchedState} flag is set. + * + * @param masterHooks The hooks to call restore on + * @param states The state to pass to the hooks + * @param checkpointId The checkpoint ID of the restored checkpoint + * @param allowUnmatchedState True, + * @param log The logger for log messages + * + * @throws FlinkException Thrown, if the hooks throw an exception, or the state+ + * deserialization fails. + */ + public static void restoreMasterHooks( + final Map<String, MasterTriggerRestoreHook<?>> masterHooks, + final List<MasterState> states, + final long checkpointId, + final boolean allowUnmatchedState, + final Logger log) throws FlinkException { + + // early out + if (states == null || states.isEmpty() || masterHooks == null || masterHooks.isEmpty()) { + log.info("No master state to restore"); + return; + } + + log.info("Calling master restore hooks"); + + // collect the hooks + final LinkedHashMap<String, MasterTriggerRestoreHook<?>> allHooks = new LinkedHashMap<>(masterHooks); + + // first, deserialize all hook state + final ArrayList<Tuple2<MasterTriggerRestoreHook<?>, Object>> hooksAndStates = new ArrayList<>(); + + for (MasterState state : states) { + if (state != null) { + final String name = state.name(); + final MasterTriggerRestoreHook<?> hook = allHooks.remove(name); + + if (hook != null) { + log.debug("Found state to restore for hook '{}'", name); + + Object deserializedState = deserializeState(state, hook); + hooksAndStates.add(new Tuple2<MasterTriggerRestoreHook<?>, Object>(hook, deserializedState)); + } + else if (!allowUnmatchedState) { + throw new IllegalStateException("Found state '" + state.name() + + "' which is not resumed by any hook."); + } + else { + log.info("Dropping unmatched state from '{}'", name); + } + } + } + + // now that all is deserialized, call the hooks + for (Tuple2<MasterTriggerRestoreHook<?>, Object> hookAndState : hooksAndStates) { + restoreHook(hookAndState.f1, hookAndState.f0, checkpointId); + } + + // trigger the remaining hooks without checkpointed state + for (MasterTriggerRestoreHook<?> hook : allHooks.values()) { + restoreHook(null, hook, checkpointId); + } + } + + private static <T> T deserializeState(MasterState state, MasterTriggerRestoreHook<?> hook) throws FlinkException { + @SuppressWarnings("unchecked") + final MasterTriggerRestoreHook<T> typedHook = (MasterTriggerRestoreHook<T>) hook; + final String id = hook.getIdentifier(); + + try { + final SimpleVersionedSerializer<T> deserializer = typedHook.createCheckpointDataSerializer(); + if (deserializer == null) { + throw new FlinkException("null serializer for state of hook " + hook.getIdentifier()); + } + + return deserializer.deserialize(state.version(), state.bytes()); + } + catch (Throwable t) { + throw new FlinkException("Cannot deserialize state for master hook '" + id + '\''); — End diff – exception is swallowed here
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3782

          Thanks a lot for the fast review. Agree with both issues raised. Will address them while merging...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3782 Thanks a lot for the fast review. Agree with both issues raised. Will address them while merging...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3782

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3782
          Hide
          StephanEwen Stephan Ewen added a comment -

          Implemented in 90ca438106e63c5032ee2ad27e54e9f573eac386

          Show
          StephanEwen Stephan Ewen added a comment - Implemented in 90ca438106e63c5032ee2ad27e54e9f573eac386
          Hide
          gyfora Gyula Fora added a comment -

          Not a big deal but Stephan I think you missed my comment

          Show
          gyfora Gyula Fora added a comment - Not a big deal but Stephan I think you missed my comment

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development