Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Fixed
-
1.0.3
-
None
-
None
Description
Adding a header with version information to savepoints ensures that we can migrate savepoints between Flink versions in the future (for example when changing internal serialization formats between versions).
After talking with Till, we propose to add the following meta data:
- Magic number (int): identify data as savepoint
- Version (int): savepoint version (independent of Flink version)
- Data Offset (int): specifies at which point the actual savepoint data starts. With this, we can allow future Flink versions to add fields to the header without breaking stuff, e.g. Flink 1.1 could read savepoints of Flink 2.0.
For Flink 1.0 savepoint support, we have to try reading the savepoints without a header before failing if we don't find the magic number.
Attachments
Issue Links
- links to
Activity
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69453976
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java —
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
+ * and that is considered completed.
+ */
+public class Checkpoint implements Serializable {
+
+ private static final long serialVersionUID = -8360248179615702014L;
+
+ private final JobID job;
+
+ private final long checkpointID;
+
+ /** The timestamp when the checkpoint was triggered. */
+ private final long timestamp;
+
+ /** The duration of the checkpoint (completion timestamp - trigger timestamp). */
+ private final long duration;
+
+ /** States of the different task groups belonging to this checkpoint */
+ private final Map<JobVertexID, TaskState> taskStates;
+
+ public Checkpoint(
+ JobID job,
+ long checkpointID,
+ long timestamp,
+ long completionTimestamp,
+ Map<JobVertexID, TaskState> taskStates) {
+
+ this.job = job;
— End diff –
`checkNotNull`?
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69454464
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java —
@@ -41,13 +41,13 @@
void stop();
/**
- * Creates a
{@link CompletedCheckpointStore} instance for a job.
+ * Creates a {@link CheckpointStore} instance for a job.
*
* @param jobId Job ID to recover checkpoints for
* @param userClassLoader User code class loader of the job
- * @return {@link CompletedCheckpointStore}instance for the job
{@link CheckpointStore}
+ * @returninstance for the job
*/ - CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+ CheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)-
- End diff –
-
Maybe we could rename this method to `createCheckpointStore` if not public.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69454948
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java —
@@ -109,13 +109,13 @@ public boolean isDiscarded()
- public CompletedCheckpoint toCompletedCheckpoint() {
+ public Checkpoint toCompletedCheckpoint() {-
- End diff –
-
Maybe we could rename this method to something `finalizeCheckpoint` or `completeCheckpoint` if it's not public.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69455074
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java —
@@ -37,10 +37,10 @@ public void stop() {
}
@Override
- public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+ public CheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)-
- End diff –
-
Renaming?
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69458362
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java —
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}
.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
— End diff –
What's usually the reason why `fileSystem.create` can fail? Why did you choose 9 retries?
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69459379
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java —
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}
.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
+ path = new Path(rootPath, FileUtils.getRandomFilename(prefix));
+ try
catch (Exception e)
{ + latestException = e; + } + }
+
+ if (fdos == null)
+
+ boolean success = false;
+ try (DataOutputStream dos = new DataOutputStream(fdos))
finally {
+ fdos.close();
+
+ if (!success && fileSystem.exists(path)) {
+ if (!fileSystem.delete(path, true))
+ }
+ }
+
+ return path.toString();
+ }
+
+ @Override
+ public Savepoint loadSavepoint(String path) throws IOException {
+ Preconditions.checkNotNull(path, "Path");
+
+ try {
+ try (FSDataInputStream fdis = createFsInputStream(new Path(path))) {
+ try (DataInputStream dis = new DataInputStream(fdis)) {
— End diff –
Why do we have a triple nested try block here? Wouldn't it also work to write:
```
try (FSDataInputStream fdis = createFsInputStream(new Path(path));
DataInputStream dis = new DataInputStream(fdis))
catch (Throwable t)
{ ... }```
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69460544
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java —
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}
.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
+ path = new Path(rootPath, FileUtils.getRandomFilename(prefix));
+ try
catch (Exception e)
{ + latestException = e; + } + }
+
+ if (fdos == null)
+
+ boolean success = false;
+ try (DataOutputStream dos = new DataOutputStream(fdos))
finally {
+ fdos.close();
+
+ if (!success && fileSystem.exists(path)) {
+ if (!fileSystem.delete(path, true))
+ }
+ }
+
+ return path.toString();
+ }
+
+ @Override
+ public Savepoint loadSavepoint(String path) throws IOException {
+ Preconditions.checkNotNull(path, "Path");
+
+ try {
+ try (FSDataInputStream fdis = createFsInputStream(new Path(path))) {
+ try (DataInputStream dis = new DataInputStream(fdis)) {
+ int magicNumber = dis.readInt();
+ int version = dis.readInt();
+
+ if (magicNumber == MAGIC_NUMBER)
else
{ + throw new IllegalStateException("Unexpected magic number. This indicates " + + "that the specified file is not a proper savepoint or " + + "the file has been corrupted."); + } + }
+ }
+ } catch (Throwable t) {
— End diff –
Exceptions which are thrown due to a wrong path or because the input stream couldn't be opened will also trigger the savepoint version 0 loading. Maybe we could differentiate between unable to open file stream and wrong magic number or unable to read version.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69461170
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java —
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+/**
+ * Savepoint store used to persist
instances.
+ *
+ * <p>The main implementation is the
. We also have the
+ *
for historical reasons (introduced in Flink 1.0).
+ */
+public interface SavepointStore {
+
+ /**
+ * Stores the savepoint.
+ *
+ * @param savepoint Savepoint to be stored
+ * @param <T> Savepoint type
+ * @return Path of stored savepoint
+ * @throws Exception Failures during store are forwarded
+ */
+ <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+
+ /**
+ * Loads the savepoint at the specified path.
+ *
+ * @param path Path of savepoint to load
+ * @return The loaded savepoint
+ * @throws Exception Failures during load are forwared
+ */
+ Savepoint loadSavepoint(String path) throws Exception;
+
+ /**
+ * Disposes the savepoint at the specified path.
+ *
+ * <p>The class loader is needed, because savepoints can currently point to
+ * arbitrary snapshot
+ * instances, which need the user code class loader for deserialization.
+ *
+ * @param path Path of savepoint to dispose
+ * @param classLoader Class loader for disposal
+ * @throws Exception Failures during diposal are forwarded
+ */
+ void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+
+ /**
+ * Shut downs the savepoint store.
+ *
+ * <p>Only necessary for implementations where the savepoint life-cycle is
+ * bound to the cluster life-cycle.
+ *
+ * @throws Exception Failures during shut down are forwarded
+ */
+ void shutDown() throws Exception;
— End diff –
I think so far we've usually written shutdown without a capital letter.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69461561
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java —
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}
.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
+ path = new Path(rootPath, FileUtils.getRandomFilename(prefix));
+ try
catch (Exception e)
{ + latestException = e; + } + }
+
+ if (fdos == null)
+
+ boolean success = false;
+ try (DataOutputStream dos = new DataOutputStream(fdos))
finally {
+ fdos.close();
+
+ if (!success && fileSystem.exists(path)) {
+ if (!fileSystem.delete(path, true))
+ }
+ }
+
+ return path.toString();
+ }
+
+ @Override
+ public Savepoint loadSavepoint(String path) throws IOException {
+ Preconditions.checkNotNull(path, "Path");
+
+ try {
+ try (FSDataInputStream fdis = createFsInputStream(new Path(path))) {
+ try (DataInputStream dis = new DataInputStream(fdis)) {
+ int magicNumber = dis.readInt();
+ int version = dis.readInt();
+
+ if (magicNumber == MAGIC_NUMBER)
else
{ + throw new IllegalStateException("Unexpected magic number. This indicates " + + "that the specified file is not a proper savepoint or " + + "the file has been corrupted."); + } + }
+ }
+ } catch (Throwable t) {
+ // Flink 1.0 did not store a header, check if it is an old savepoint.
+ // Only after this fails as well we can be sure that it is an actual
+ // failure.
+ Savepoint savepoint = tryLoadSavepointV0(path);
+ if (savepoint == null)
else
{ + return savepoint; + } + }
+ }
+
+ @Override
+ public void disposeSavepoint(String path, ClassLoader classLoader) throws Exception {
+ Preconditions.checkNotNull(path, "Path");
+ Preconditions.checkNotNull(classLoader, "Class loader");
+
+ try {
+ Savepoint savepoint = loadSavepoint(path);
+ savepoint.dispose(classLoader);
+
+ Path filePath = new Path(path);
+
+ if (fileSystem.exists(filePath)) {
+ if (!fileSystem.delete(filePath, true))
+ } else
{ + throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'."); + }+ } catch (Throwable t)
{ + throw new IOException("Failed to dispose savepoint " + path + ".", t); + } + }
+
+ @Override
+ public void shutDown() throws Exception
+
+ private FSDataInputStream createFsInputStream(Path path) throws IOException {
+ if (fileSystem.exists(path))
else
{ + throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'."); + } + }
+
+ /**
+ * Tries to load a Flink 1.0 savepoint, which was not stored with a header.
+ *
+ * <p>This can be removed for Flink 1.2.
— End diff –
Why can we remove this for Flink 1.2? Wouldn't that mean that Flink 1.2 can only restore savepoints from version 1.1 onwards? I think it should also be possible to restore savepoints from version 1.0 with version 1.2.
Github user StephanEwen commented on the issue:
https://github.com/apache/flink/pull/2194
test comment
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69520756
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java —
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+
+/**
+ * Serializer for
instances.
+ *
+ * <p>This format was introduced with Flink 1.0.0 and simply wraps a
+ *
instance and uses plain Java serialization.
+ * That's why we have to stick to the exact same format, including classes,
+ * which are only part of our code base for backwards compatability.
+ */
+public class SavepointV0Serializer implements SavepointSerializer<SavepointV0> {
+
+ public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
— End diff –
Maybe we should add a private constructor so that nobody can instantiate this class.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69520850
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java —
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+
+/**
+ * Serializer for
instances.
+ *
+ * <p>This format was introduced with Flink 1.0.0 and simply wraps a
+ *
instance and uses plain Java serialization.
+ * That's why we have to stick to the exact same format, including classes,
+ * which are only part of our code base for backwards compatability.
+ */
+public class SavepointV0Serializer implements SavepointSerializer<SavepointV0> {
+
+ public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
+
+ @Override
+ public void serialize(SavepointV0 savepoint, OutputStream os) throws IOException
+
+ @Override
+ public SavepointV0 deserialize(InputStream is) throws IOException {
+ try (ObjectInputStream ois = new ObjectInputStream(is)) {
+ try {
— End diff –
Isn't one try block enough?
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69522651
— Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java —
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StateForTask;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class FsSavepointStoreTest {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ /**
+ * Tests a store-load-dispose sequence.
+ */
+ @Test
+ public void testStoreLoadDispose() throws Exception
+
+ /**
+ * Tests loading of a Flink 1.0 savepoint.
+ */
+ @Test
+ public void testLoadFlink10Savepoint() throws Exception {
+ // Copied from Flink 1.0
+ CompletedCheckpoint checkpoint = SavepointV0Test.createCompletedCheckpoint(
+ new JobID(),
+ 10210230,
+ System.currentTimeMillis(),
+ System.currentTimeMillis(),
+ 32);
+
+ // Copied from Flink 1.0 savepoint serialization code path
+ Path filePath = new Path(tmp.getRoot().getPath(), FileUtils.getRandomFilename("fs-savepoint-store-test-"));
+ FSDataOutputStream outStream = FileSystem.get(filePath.toUri()).create(filePath, false);
+ try (ObjectOutputStream os = new ObjectOutputStream(outStream))
+
+ // Load with savepoint store
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ SavepointV0 savepoint = (SavepointV0) store.loadSavepoint(filePath.toString());
+
+ // Verify all expected task states
+ for (StateForTask stateForTask : checkpoint.getStates()) {
+ JobVertexID expectedJobVertexId = stateForTask.getOperatorId();
+ int subTaskIndex = stateForTask.getSubtask();
+
+ boolean foundMatch = false;
+ for (TaskState taskState : savepoint.getTaskStates()) {
+ if (taskState.getJobVertexID().equals(expectedJobVertexId))
+ }
+
+ assertTrue("Did not find TaskState for " + stateForTask, foundMatch);
+ }
+ }
+
+ /**
+ * Tests loading with unexpected file types.
+ */
+ @Test
+ public void testUnexpectedFileType() throws Exception {
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+
+ // Random file
+ Path filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
+ FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ DataOutputStream dos = new DataOutputStream(fdos);
+ for (int i = 0; i < 10; i++)
+
+ try
+
+ // Very short file (EOFException)
+ filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
+ fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ dos = new DataOutputStream(fdos);
+
+ dos.writeByte(0);
+
+ try { + store.loadSavepoint(filePath.toString()); + fail("Did not throw expected Exception"); + }
catch (IOException e)
{ + // Unexpected magic number + assertTrue(e.getCause() instanceof EOFException); + } + }
+
+ /**
+ * Tests addition of a new savepoint version.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultipleSavepointVersions() throws Exception {
+ Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
+ field.setAccessible(true);
+ Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
+
+ assertTrue(serializers.size() >= 1);
+
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ assertEquals(0, tmp.getRoot().listFiles().length);
+
+ // New savepoint type for test
+ int version = ThreadLocalRandom.current().nextInt();
+ long checkpointId = ThreadLocalRandom.current().nextLong();
+
+ // Add serializer
+ serializers.put(version, NewSavepointSerializer.INSTANCE);
+
+ TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
+ String pathNewSavepoint = store.storeSavepoint(newSavepoint);
+ assertEquals(1, tmp.getRoot().listFiles().length);
+
+ // Savepoint v0
— End diff –
savepoint v1
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69522896
— Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java —
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StateForTask;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class FsSavepointStoreTest {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ /**
+ * Tests a store-load-dispose sequence.
+ */
+ @Test
+ public void testStoreLoadDispose() throws Exception
+
+ /**
+ * Tests loading of a Flink 1.0 savepoint.
+ */
+ @Test
+ public void testLoadFlink10Savepoint() throws Exception {
+ // Copied from Flink 1.0
+ CompletedCheckpoint checkpoint = SavepointV0Test.createCompletedCheckpoint(
+ new JobID(),
+ 10210230,
+ System.currentTimeMillis(),
+ System.currentTimeMillis(),
+ 32);
+
+ // Copied from Flink 1.0 savepoint serialization code path
+ Path filePath = new Path(tmp.getRoot().getPath(), FileUtils.getRandomFilename("fs-savepoint-store-test-"));
+ FSDataOutputStream outStream = FileSystem.get(filePath.toUri()).create(filePath, false);
+ try (ObjectOutputStream os = new ObjectOutputStream(outStream))
+
+ // Load with savepoint store
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ SavepointV0 savepoint = (SavepointV0) store.loadSavepoint(filePath.toString());
+
+ // Verify all expected task states
+ for (StateForTask stateForTask : checkpoint.getStates()) {
+ JobVertexID expectedJobVertexId = stateForTask.getOperatorId();
+ int subTaskIndex = stateForTask.getSubtask();
+
+ boolean foundMatch = false;
+ for (TaskState taskState : savepoint.getTaskStates()) {
+ if (taskState.getJobVertexID().equals(expectedJobVertexId))
+ }
+
+ assertTrue("Did not find TaskState for " + stateForTask, foundMatch);
+ }
+ }
+
+ /**
+ * Tests loading with unexpected file types.
+ */
+ @Test
+ public void testUnexpectedFileType() throws Exception {
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+
+ // Random file
+ Path filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
+ FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ DataOutputStream dos = new DataOutputStream(fdos);
+ for (int i = 0; i < 10; i++)
+
+ try
+
+ // Very short file (EOFException)
+ filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
+ fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ dos = new DataOutputStream(fdos);
+
+ dos.writeByte(0);
+
+ try { + store.loadSavepoint(filePath.toString()); + fail("Did not throw expected Exception"); + }
catch (IOException e)
{ + // Unexpected magic number + assertTrue(e.getCause() instanceof EOFException); + } + }
+
+ /**
+ * Tests addition of a new savepoint version.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultipleSavepointVersions() throws Exception
+
+ /**
+ * Tests that an exception during store cleans up any created files.
— End diff –
Shouldn't we check that only the failed savepoint is cleaned up?
Github user tillrohrmann commented on the issue:
https://github.com/apache/flink/pull/2194
Really good work @uce. The code is well structured and thoroughly tested. I had only some minor comments.
While testing the code with the streaming state machine job I stumbled across a problem, though. Recovering from a Flink 1.0 savepoint does not work if the job contains a `keyBy` operation. The reason is that we had a faulty murmur hash implementation in Flink 1.0 and due to its correction, the mapping of keys to sub tasks has changed. Consequently, the restored state no longer matches the assigned key spaces for each operator. This is the problematic [commit](https://github.com/apache/flink/commit/641a0d436c9b7a34ff33ceb370cf29962cac4dee).
Thus, this change is actually breaking our backwards compatibility with respect to savepoints. In order to solve the problem I see three possibilities:
- Revert the changes of this commit. But we don't know how the flawed murmur hash performs.
- Develop a tool which can repartition savepoints
- Don't support backwards compatibility between version 1.0 and 1.1
I think that option 3 is not doable given our backwards compatibility promise. Furthermore, option 2 is not really straight forward, if the user has a keyed stream where he uses the `Checkpointed` interface. Given that the release is upcoming, I think option 1 would be the best way to solve the problem.
Github user uce commented on the issue:
https://github.com/apache/flink/pull/2194
Thanks for the thorough review! I didn't catch that by my manual test. I would also be in favour of 1) for the same reasons.
Github user uce commented on the issue:
https://github.com/apache/flink/pull/2194
@greghogan You fixed the murmur hash in the mentioned commit. Do you think it's OK to revert back to the wrong version we had before? Did you actually observe bad behaviour of the wrong murmur hash variant?
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69722894
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoint.java —
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state)
+ * and that is considered completed.
+ */
+public class Checkpoint implements Serializable {
+
+ private static final long serialVersionUID = -8360248179615702014L;
+
+ private final JobID job;
+
+ private final long checkpointID;
+
+ /** The timestamp when the checkpoint was triggered. */
+ private final long timestamp;
+
+ /** The duration of the checkpoint (completion timestamp - trigger timestamp). */
+ private final long duration;
+
+ /** States of the different task groups belonging to this checkpoint */
+ private final Map<JobVertexID, TaskState> taskStates;
+
+ public Checkpoint(
+ JobID job,
+ long checkpointID,
+ long timestamp,
+ long completionTimestamp,
+ Map<JobVertexID, TaskState> taskStates) {
+
+ this.job = job;
— End diff –
Will do (this is an unrelated change though as `Checkpoint` is a renaming of the current `master`'s `CompletedCheckpoint`)
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69723013
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java —
@@ -41,13 +41,13 @@
void stop();
/**
- * Creates a
{@link CompletedCheckpointStore} instance for a job.
+ * Creates a {@link CheckpointStore} instance for a job.
*
* @param jobId Job ID to recover checkpoints for
* @param userClassLoader User code class loader of the job
- * @return {@link CompletedCheckpointStore}instance for the job
{@link CheckpointStore}
+ * @returninstance for the job
*/ - CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+ CheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)-
- End diff –
-
Will do, the current name is not good
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69723425
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java —
@@ -109,13 +109,13 @@ public boolean isDiscarded()
- public CompletedCheckpoint toCompletedCheckpoint() {
+ public Checkpoint toCompletedCheckpoint() {-
- End diff –
-
Will do
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69723478
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java —
@@ -37,10 +37,10 @@ public void stop() {
}
@Override
- public CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
+ public CheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)-
- End diff –
-
Yes, to be consistent with the `CompletedCheckpoint => Checkpoint` renaming
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69723923
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java —
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}
.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
— End diff –
Since it can be also a distributed FS like HDFS, it can fail for various reasons. The code here is moved from what we currently have in master (including number of retries etc.). I think initially from `FileSystemStateStorageHelper` to `FileSystemStateStore` to here.
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69723989
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java —
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}
.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
+ path = new Path(rootPath, FileUtils.getRandomFilename(prefix));
+ try
catch (Exception e)
{ + latestException = e; + } + }
+
+ if (fdos == null)
+
+ boolean success = false;
+ try (DataOutputStream dos = new DataOutputStream(fdos))
finally {
+ fdos.close();
+
+ if (!success && fileSystem.exists(path)) {
+ if (!fileSystem.delete(path, true))
+ }
+ }
+
+ return path.toString();
+ }
+
+ @Override
+ public Savepoint loadSavepoint(String path) throws IOException {
+ Preconditions.checkNotNull(path, "Path");
+
+ try {
+ try (FSDataInputStream fdis = createFsInputStream(new Path(path))) {
+ try (DataInputStream dis = new DataInputStream(fdis)) {
— End diff –
True :smile:
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69725285
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java —
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}
.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
+ path = new Path(rootPath, FileUtils.getRandomFilename(prefix));
+ try
catch (Exception e)
{ + latestException = e; + } + }
+
+ if (fdos == null)
+
+ boolean success = false;
+ try (DataOutputStream dos = new DataOutputStream(fdos))
finally {
+ fdos.close();
+
+ if (!success && fileSystem.exists(path)) {
+ if (!fileSystem.delete(path, true))
+ }
+ }
+
+ return path.toString();
+ }
+
+ @Override
+ public Savepoint loadSavepoint(String path) throws IOException {
+ Preconditions.checkNotNull(path, "Path");
+
+ try {
+ try (FSDataInputStream fdis = createFsInputStream(new Path(path))) {
+ try (DataInputStream dis = new DataInputStream(fdis)) {
+ int magicNumber = dis.readInt();
+ int version = dis.readInt();
+
+ if (magicNumber == MAGIC_NUMBER)
else
{ + throw new IllegalStateException("Unexpected magic number. This indicates " + + "that the specified file is not a proper savepoint or " + + "the file has been corrupted."); + } + }
+ }
+ } catch (Throwable t) {
+ // Flink 1.0 did not store a header, check if it is an old savepoint.
+ // Only after this fails as well we can be sure that it is an actual
+ // failure.
+ Savepoint savepoint = tryLoadSavepointV0(path);
+ if (savepoint == null)
else
{ + return savepoint; + } + }
+ }
+
+ @Override
+ public void disposeSavepoint(String path, ClassLoader classLoader) throws Exception {
+ Preconditions.checkNotNull(path, "Path");
+ Preconditions.checkNotNull(classLoader, "Class loader");
+
+ try {
+ Savepoint savepoint = loadSavepoint(path);
+ savepoint.dispose(classLoader);
+
+ Path filePath = new Path(path);
+
+ if (fileSystem.exists(filePath)) {
+ if (!fileSystem.delete(filePath, true))
+ } else
{ + throw new IllegalArgumentException("Invalid path '" + filePath.toUri() + "'."); + }+ } catch (Throwable t)
{ + throw new IOException("Failed to dispose savepoint " + path + ".", t); + } + }
+
+ @Override
+ public void shutDown() throws Exception
+
+ private FSDataInputStream createFsInputStream(Path path) throws IOException {
+ if (fileSystem.exists(path))
else
{ + throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'."); + } + }
+
+ /**
+ * Tries to load a Flink 1.0 savepoint, which was not stored with a header.
+ *
+ * <p>This can be removed for Flink 1.2.
— End diff –
True, that should be defined outside of the code and we will probably like to keep it backwards compatible as long as possible. I will remove all comments like this.
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69727662
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java —
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}
.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
+ path = new Path(rootPath, FileUtils.getRandomFilename(prefix));
+ try
catch (Exception e)
{ + latestException = e; + } + }
+
+ if (fdos == null)
+
+ boolean success = false;
+ try (DataOutputStream dos = new DataOutputStream(fdos))
finally {
+ fdos.close();
+
+ if (!success && fileSystem.exists(path)) {
+ if (!fileSystem.delete(path, true))
+ }
+ }
+
+ return path.toString();
+ }
+
+ @Override
+ public Savepoint loadSavepoint(String path) throws IOException {
+ Preconditions.checkNotNull(path, "Path");
+
+ try {
+ try (FSDataInputStream fdis = createFsInputStream(new Path(path))) {
+ try (DataInputStream dis = new DataInputStream(fdis)) {
+ int magicNumber = dis.readInt();
+ int version = dis.readInt();
+
+ if (magicNumber == MAGIC_NUMBER)
else
{ + throw new IllegalStateException("Unexpected magic number. This indicates " + + "that the specified file is not a proper savepoint or " + + "the file has been corrupted."); + } + }
+ }
+ } catch (Throwable t) {
— End diff –
Good point, especially with opening the stream etc.
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69728188
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java —
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+/**
+ * Savepoint store used to persist
instances.
+ *
+ * <p>The main implementation is the
. We also have the
+ *
for historical reasons (introduced in Flink 1.0).
+ */
+public interface SavepointStore {
+
+ /**
+ * Stores the savepoint.
+ *
+ * @param savepoint Savepoint to be stored
+ * @param <T> Savepoint type
+ * @return Path of stored savepoint
+ * @throws Exception Failures during store are forwarded
+ */
+ <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+
+ /**
+ * Loads the savepoint at the specified path.
+ *
+ * @param path Path of savepoint to load
+ * @return The loaded savepoint
+ * @throws Exception Failures during load are forwared
+ */
+ Savepoint loadSavepoint(String path) throws Exception;
+
+ /**
+ * Disposes the savepoint at the specified path.
+ *
+ * <p>The class loader is needed, because savepoints can currently point to
+ * arbitrary snapshot
+ * instances, which need the user code class loader for deserialization.
+ *
+ * @param path Path of savepoint to dispose
+ * @param classLoader Class loader for disposal
+ * @throws Exception Failures during diposal are forwarded
+ */
+ void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+
+ /**
+ * Shut downs the savepoint store.
+ *
+ * <p>Only necessary for implementations where the savepoint life-cycle is
+ * bound to the cluster life-cycle.
+ *
+ * @throws Exception Failures during shut down are forwarded
+ */
+ void shutDown() throws Exception;
— End diff –
We have it as `shutDown` in other places as well, but it's true that there are many places with `shutdown`. Since the method name describes an action I would like to keep it as two words shut down.
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69728283
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java —
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+
+/**
+ * Serializer for
instances.
+ *
+ * <p>This format was introduced with Flink 1.0.0 and simply wraps a
+ *
instance and uses plain Java serialization.
+ * That's why we have to stick to the exact same format, including classes,
+ * which are only part of our code base for backwards compatability.
+ */
+public class SavepointV0Serializer implements SavepointSerializer<SavepointV0> {
+
+ public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
— End diff –
Forgot that, will add the private constructor
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69728416
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java —
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+
+/**
+ * Serializer for
instances.
+ *
+ * <p>This format was introduced with Flink 1.0.0 and simply wraps a
+ *
instance and uses plain Java serialization.
+ * That's why we have to stick to the exact same format, including classes,
+ * which are only part of our code base for backwards compatability.
+ */
+public class SavepointV0Serializer implements SavepointSerializer<SavepointV0> {
+
+ public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
+
+ @Override
+ public void serialize(SavepointV0 savepoint, OutputStream os) throws IOException
+
+ @Override
+ public SavepointV0 deserialize(InputStream is) throws IOException {
+ try (ObjectInputStream ois = new ObjectInputStream(is)) {
+ try {
— End diff –
Oh yes 😄
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69728549
— Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java —
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StateForTask;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class FsSavepointStoreTest {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ /**
+ * Tests a store-load-dispose sequence.
+ */
+ @Test
+ public void testStoreLoadDispose() throws Exception
+
+ /**
+ * Tests loading of a Flink 1.0 savepoint.
+ */
+ @Test
+ public void testLoadFlink10Savepoint() throws Exception {
+ // Copied from Flink 1.0
+ CompletedCheckpoint checkpoint = SavepointV0Test.createCompletedCheckpoint(
+ new JobID(),
+ 10210230,
+ System.currentTimeMillis(),
+ System.currentTimeMillis(),
+ 32);
+
+ // Copied from Flink 1.0 savepoint serialization code path
+ Path filePath = new Path(tmp.getRoot().getPath(), FileUtils.getRandomFilename("fs-savepoint-store-test-"));
+ FSDataOutputStream outStream = FileSystem.get(filePath.toUri()).create(filePath, false);
+ try (ObjectOutputStream os = new ObjectOutputStream(outStream))
+
+ // Load with savepoint store
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ SavepointV0 savepoint = (SavepointV0) store.loadSavepoint(filePath.toString());
+
+ // Verify all expected task states
+ for (StateForTask stateForTask : checkpoint.getStates()) {
+ JobVertexID expectedJobVertexId = stateForTask.getOperatorId();
+ int subTaskIndex = stateForTask.getSubtask();
+
+ boolean foundMatch = false;
+ for (TaskState taskState : savepoint.getTaskStates()) {
+ if (taskState.getJobVertexID().equals(expectedJobVertexId))
+ }
+
+ assertTrue("Did not find TaskState for " + stateForTask, foundMatch);
+ }
+ }
+
+ /**
+ * Tests loading with unexpected file types.
+ */
+ @Test
+ public void testUnexpectedFileType() throws Exception {
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+
+ // Random file
+ Path filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
+ FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ DataOutputStream dos = new DataOutputStream(fdos);
+ for (int i = 0; i < 10; i++)
+
+ try
+
+ // Very short file (EOFException)
+ filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
+ fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ dos = new DataOutputStream(fdos);
+
+ dos.writeByte(0);
+
+ try { + store.loadSavepoint(filePath.toString()); + fail("Did not throw expected Exception"); + }
catch (IOException e)
{ + // Unexpected magic number + assertTrue(e.getCause() instanceof EOFException); + } + }
+
+ /**
+ * Tests addition of a new savepoint version.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultipleSavepointVersions() throws Exception {
+ Field field = SavepointSerializers.class.getDeclaredField("SERIALIZERS");
+ field.setAccessible(true);
+ Map<Integer, SavepointSerializer<?>> serializers = (Map<Integer, SavepointSerializer<?>>) field.get(null);
+
+ assertTrue(serializers.size() >= 1);
+
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ assertEquals(0, tmp.getRoot().listFiles().length);
+
+ // New savepoint type for test
+ int version = ThreadLocalRandom.current().nextInt();
+ long checkpointId = ThreadLocalRandom.current().nextLong();
+
+ // Add serializer
+ serializers.put(version, NewSavepointSerializer.INSTANCE);
+
+ TestSavepoint newSavepoint = new TestSavepoint(version, checkpointId);
+ String pathNewSavepoint = store.storeSavepoint(newSavepoint);
+ assertEquals(1, tmp.getRoot().listFiles().length);
+
+ // Savepoint v0
— End diff –
True
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69728577
— Diff: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java —
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StateForTask;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class FsSavepointStoreTest {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ /**
+ * Tests a store-load-dispose sequence.
+ */
+ @Test
+ public void testStoreLoadDispose() throws Exception
+
+ /**
+ * Tests loading of a Flink 1.0 savepoint.
+ */
+ @Test
+ public void testLoadFlink10Savepoint() throws Exception {
+ // Copied from Flink 1.0
+ CompletedCheckpoint checkpoint = SavepointV0Test.createCompletedCheckpoint(
+ new JobID(),
+ 10210230,
+ System.currentTimeMillis(),
+ System.currentTimeMillis(),
+ 32);
+
+ // Copied from Flink 1.0 savepoint serialization code path
+ Path filePath = new Path(tmp.getRoot().getPath(), FileUtils.getRandomFilename("fs-savepoint-store-test-"));
+ FSDataOutputStream outStream = FileSystem.get(filePath.toUri()).create(filePath, false);
+ try (ObjectOutputStream os = new ObjectOutputStream(outStream))
+
+ // Load with savepoint store
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ SavepointV0 savepoint = (SavepointV0) store.loadSavepoint(filePath.toString());
+
+ // Verify all expected task states
+ for (StateForTask stateForTask : checkpoint.getStates()) {
+ JobVertexID expectedJobVertexId = stateForTask.getOperatorId();
+ int subTaskIndex = stateForTask.getSubtask();
+
+ boolean foundMatch = false;
+ for (TaskState taskState : savepoint.getTaskStates()) {
+ if (taskState.getJobVertexID().equals(expectedJobVertexId))
+ }
+
+ assertTrue("Did not find TaskState for " + stateForTask, foundMatch);
+ }
+ }
+
+ /**
+ * Tests loading with unexpected file types.
+ */
+ @Test
+ public void testUnexpectedFileType() throws Exception {
+ FsSavepointStore store = new FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+
+ // Random file
+ Path filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
+ FSDataOutputStream fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ DataOutputStream dos = new DataOutputStream(fdos);
+ for (int i = 0; i < 10; i++)
+
+ try
+
+ // Very short file (EOFException)
+ filePath = new Path(tmp.getRoot().getPath(), UUID.randomUUID().toString());
+ fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ dos = new DataOutputStream(fdos);
+
+ dos.writeByte(0);
+
+ try { + store.loadSavepoint(filePath.toString()); + fail("Did not throw expected Exception"); + }
catch (IOException e)
{ + // Unexpected magic number + assertTrue(e.getCause() instanceof EOFException); + } + }
+
+ /**
+ * Tests addition of a new savepoint version.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultipleSavepointVersions() throws Exception
+
+ /**
+ * Tests that an exception during store cleans up any created files.
— End diff –
Misleading formulation in the comment, that's exactly what's tested.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69734443
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java —
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+/**
+ * Savepoint store used to persist
instances.
+ *
+ * <p>The main implementation is the
. We also have the
+ *
for historical reasons (introduced in Flink 1.0).
+ */
+public interface SavepointStore {
+
+ /**
+ * Stores the savepoint.
+ *
+ * @param savepoint Savepoint to be stored
+ * @param <T> Savepoint type
+ * @return Path of stored savepoint
+ * @throws Exception Failures during store are forwarded
+ */
+ <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+
+ /**
+ * Loads the savepoint at the specified path.
+ *
+ * @param path Path of savepoint to load
+ * @return The loaded savepoint
+ * @throws Exception Failures during load are forwared
+ */
+ Savepoint loadSavepoint(String path) throws Exception;
+
+ /**
+ * Disposes the savepoint at the specified path.
+ *
+ * <p>The class loader is needed, because savepoints can currently point to
+ * arbitrary snapshot
+ * instances, which need the user code class loader for deserialization.
+ *
+ * @param path Path of savepoint to dispose
+ * @param classLoader Class loader for disposal
+ * @throws Exception Failures during diposal are forwarded
+ */
+ void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+
+ /**
+ * Shut downs the savepoint store.
+ *
+ * <p>Only necessary for implementations where the savepoint life-cycle is
+ * bound to the cluster life-cycle.
+ *
+ * @throws Exception Failures during shut down are forwarded
+ */
+ void shutDown() throws Exception;
— End diff –
A quick search in the code base revealed that we have 5 usages of `shutDown()` and 100+ usages of `shutdown()`. I don't want to be hairsplitting, but I think the current convention is to write it without a capital letter.
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69735446
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java —
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+/**
+ * Savepoint store used to persist
instances.
+ *
+ * <p>The main implementation is the
. We also have the
+ *
for historical reasons (introduced in Flink 1.0).
+ */
+public interface SavepointStore {
+
+ /**
+ * Stores the savepoint.
+ *
+ * @param savepoint Savepoint to be stored
+ * @param <T> Savepoint type
+ * @return Path of stored savepoint
+ * @throws Exception Failures during store are forwarded
+ */
+ <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+
+ /**
+ * Loads the savepoint at the specified path.
+ *
+ * @param path Path of savepoint to load
+ * @return The loaded savepoint
+ * @throws Exception Failures during load are forwared
+ */
+ Savepoint loadSavepoint(String path) throws Exception;
+
+ /**
+ * Disposes the savepoint at the specified path.
+ *
+ * <p>The class loader is needed, because savepoints can currently point to
+ * arbitrary snapshot
+ * instances, which need the user code class loader for deserialization.
+ *
+ * @param path Path of savepoint to dispose
+ * @param classLoader Class loader for disposal
+ * @throws Exception Failures during diposal are forwarded
+ */
+ void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+
+ /**
+ * Shut downs the savepoint store.
+ *
+ * <p>Only necessary for implementations where the savepoint life-cycle is
+ * bound to the cluster life-cycle.
+ *
+ * @throws Exception Failures during shut down are forwarded
+ */
+ void shutDown() throws Exception;
— End diff –
But the convention for virtually all other methods is that method names are typically verbs or verb phrases. But I don't want to be hairsplitting. I'll rename it.
Github user uce commented on the issue:
https://github.com/apache/flink/pull/2194
Thanks for the review again. I've addressed all comments. If you have time, it would be great to have a look over the change in 157c4e1. I've ran all tests locally after reverting FLINK-3623 and everything passes.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69753840
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java —
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+/**
+ * Savepoint store used to persist
instances.
+ *
+ * <p>The main implementation is the
. We also have the
+ *
for historical reasons (introduced in Flink 1.0).
+ */
+public interface SavepointStore {
+
+ /**
+ * Stores the savepoint.
+ *
+ * @param savepoint Savepoint to be stored
+ * @param <T> Savepoint type
+ * @return Path of stored savepoint
+ * @throws Exception Failures during store are forwarded
+ */
+ <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+
+ /**
+ * Loads the savepoint at the specified path.
+ *
+ * @param path Path of savepoint to load
+ * @return The loaded savepoint
+ * @throws Exception Failures during load are forwared
+ */
+ Savepoint loadSavepoint(String path) throws Exception;
+
+ /**
+ * Disposes the savepoint at the specified path.
+ *
+ * <p>The class loader is needed, because savepoints can currently point to
+ * arbitrary snapshot
+ * instances, which need the user code class loader for deserialization.
+ *
+ * @param path Path of savepoint to dispose
+ * @param classLoader Class loader for disposal
+ * @throws Exception Failures during diposal are forwarded
+ */
+ void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+
+ /**
+ * Shut downs the savepoint store.
+ *
+ * <p>Only necessary for implementations where the savepoint life-cycle is
+ * bound to the cluster life-cycle.
+ *
+ * @throws Exception Failures during shut down are forwarded
+ */
+ void shutDown() throws Exception;
— End diff –
I think shutdown is a verb in fact.
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69755315
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java —
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+/**
+ * Savepoint store used to persist
instances.
+ *
+ * <p>The main implementation is the
. We also have the
+ *
for historical reasons (introduced in Flink 1.0).
+ */
+public interface SavepointStore {
+
+ /**
+ * Stores the savepoint.
+ *
+ * @param savepoint Savepoint to be stored
+ * @param <T> Savepoint type
+ * @return Path of stored savepoint
+ * @throws Exception Failures during store are forwarded
+ */
+ <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+
+ /**
+ * Loads the savepoint at the specified path.
+ *
+ * @param path Path of savepoint to load
+ * @return The loaded savepoint
+ * @throws Exception Failures during load are forwared
+ */
+ Savepoint loadSavepoint(String path) throws Exception;
+
+ /**
+ * Disposes the savepoint at the specified path.
+ *
+ * <p>The class loader is needed, because savepoints can currently point to
+ * arbitrary snapshot
+ * instances, which need the user code class loader for deserialization.
+ *
+ * @param path Path of savepoint to dispose
+ * @param classLoader Class loader for disposal
+ * @throws Exception Failures during diposal are forwarded
+ */
+ void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+
+ /**
+ * Shut downs the savepoint store.
+ *
+ * <p>Only necessary for implementations where the savepoint life-cycle is
+ * bound to the cluster life-cycle.
+ *
+ * @throws Exception Failures during shut down are forwarded
+ */
+ void shutDown() throws Exception;
— End diff –
http://www.dict.cc/?s=shut+down vs. http://www.dict.cc/?s=shutdown, but I really don't care :smile: Let's just leave it as `shutdown`.
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69867220
— Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java —
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+/**
+ * Savepoint store used to persist
instances.
+ *
+ * <p>The main implementation is the
. We also have the
+ *
for historical reasons (introduced in Flink 1.0).
+ */
+public interface SavepointStore {
+
+ /**
+ * Stores the savepoint.
+ *
+ * @param savepoint Savepoint to be stored
+ * @param <T> Savepoint type
+ * @return Path of stored savepoint
+ * @throws Exception Failures during store are forwarded
+ */
+ <T extends Savepoint> String storeSavepoint(T savepoint) throws Exception;
+
+ /**
+ * Loads the savepoint at the specified path.
+ *
+ * @param path Path of savepoint to load
+ * @return The loaded savepoint
+ * @throws Exception Failures during load are forwared
+ */
+ Savepoint loadSavepoint(String path) throws Exception;
+
+ /**
+ * Disposes the savepoint at the specified path.
+ *
+ * <p>The class loader is needed, because savepoints can currently point to
+ * arbitrary snapshot
+ * instances, which need the user code class loader for deserialization.
+ *
+ * @param path Path of savepoint to dispose
+ * @param classLoader Class loader for disposal
+ * @throws Exception Failures during diposal are forwarded
+ */
+ void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+
+ /**
+ * Shut downs the savepoint store.
+ *
+ * <p>Only necessary for implementations where the savepoint life-cycle is
+ * bound to the cluster life-cycle.
+ *
+ * @throws Exception Failures during shut down are forwarded
+ */
+ void shutDown() throws Exception;
— End diff –
Hmm dict.leo.org says it differently: http://dict.leo.org/ende/index_de.html#/search=shutdown&searchLoc=0&resultOrder=basic&multiwordShowSingle=on&pos=0
But on the other hand there is also this site: http://notaverb.com/shutdown.
Github user uce commented on the issue:
https://github.com/apache/flink/pull/2194
If there are no objections, I would like to merge this.
Github user uce commented on the issue:
https://github.com/apache/flink/pull/2194
After talking with @tillrohrmann and @StephanEwen, we discovered some obvious cases where restoring from 1.0 savepoints won't work in 1.1, for example if window state is involved, which is serialized in a different format since 1.0. I think the same is true for RocksDB.
Should we by default throw an Exception when a user tries to restore a Flink 1.0 savepoint with 1.1? On the other hand, there are cases where it would work as expected... but if it does not work the error message will not be very helpful.
Github user uce commented on the issue:
https://github.com/apache/flink/pull/2194
Magic number mismatch will fail with an error message stating that a likely cause for this is resuming from Flink 1.0 savepoints, which is not supported. I've removed the legacy classes and undid the renamings. `SavepointV1` (Flink 1.1) has been down-graded to `SavepointV0` and the previous `SavepointV0` (for Flink 1.0) has been removed.
GitHub user uce opened a pull request:
https://github.com/apache/flink/pull/2194
FLINK-4067[runtime] Add savepoint headersSavepoints were previously persisted without any meta data using default Java serialization of a `CompletedCheckpoint`. This PR introduces a savepoint class with version-specific serializers and stores savepoints with meta data.
Savepoints expose a version number and a `Collection<TaskState>` for savepoint restore.
Currently, there are two savepoint versions:
The savepoints are stored in `FsSavepointStore` with the following format:
```
MagicNumber SavepointVersion Savepoint
```
The header is minimal (magic number, version). All savepoint-specific meta data can be moved to the savepoint itself. This is also were we would have to add new meta data in future versions, allowing us to differentiate between different savepoint versions when we change the serialization stack etc.
I've tested both manually and via a unit test that it works to trigger a savepoint via Flink 1.0 and resume from it with Flink 1.1.
All savepoint related classes have been moved from `checkpoint` to a new sub package `checkpoint.savepoint`.
The main classes to look at to review this PR are `Savepoint` and subclasses, `SavepointSerializer` and subclasses, and `FsSavepointStore`. @tillrohrmann do you have time to do this?
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uce/flink 4067-savepoint_header
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2194.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2194
commit af0d418ba2bc3591f0526476d1ad7dfc4160c205
Author: Ufuk Celebi <uce@apache.org>
Date: 2016-06-30T14:16:41Z
FLINK-4067[runtime] Add savepoint headers