Details
-
Bug
-
Status: Resolved
-
Critical
-
Resolution: Fixed
-
1.17.0, 1.18.0, 1.19.0, 1.20.0
Description
With Flink 1.20 we observed another checkpoint corruption bug. This is similar to FLINK-35217, but affects only files written by the taskmanager (the ones with random names as described here).
After system crash the files written by the taskmanager may be corrupted (file size of 0 bytes) if the changes in the file-system cache haven't been written to disk. The "_metadata" file written by the jobmanager is always fine because it's properly fsynced.
Investigation revealed that "fsync" is missing, this time in "FsCheckpointStreamFactory". In this case the "OutputStream" is closed without calling "fsync", thus the file is not durably persisted on disk before the checkpoint is completed. (As previously established in FLINK-35217, calling "fsync" is necessary as simply closing the stream does not have any guarantees on persistence.)
"strace" on the taskmanager's process confirms this behavior:
- The checkpoint chk-1217's directory is created at "mkdir"
- The checkpoint chk-1217's non-inline state is written by the taskmanager at "openat", filename is "0507881e-8877-40b0-82d6-3d7dead64ccc". Note that there's no "fsync" before "close".
- The checkpoint chk-1217 is finished, its "_metadata" is written and synced properly
- The old checkpoint chk-1216 is deleted at "unlink"
The new checkpoint chk-1217 now references a not-synced file that can get corrupted on e.g. power loss. This means there is no working checkpoint left as the old checkpoint was deleted.
For durable persistence an "fsync" call is missing before "close" in step 2.
Full "strace" log:
[pid 947250] 08:22:58 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 0x7f68414c5b50) = -1 ENOENT (No such file or directory) [pid 947250] 08:22:58 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 0x7f68414c5b50) = -1 ENOENT (No such file or directory) [pid 947250] 08:22:58 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 [pid 947250] 08:22:58 mkdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", 0777) = 0 [pid 1303248] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc", 0x7f56f08d5610) = -1 ENOENT (No such file or directory) [pid 1303248] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 [pid 1303248] 08:22:59 openat(AT_FDCWD, "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/0507881e-8877-40b0-82d6-3d7dead64ccc", O_WRONLY|O_CREAT|O_TRUNC, 0666) = 199 [pid 1303248] 08:22:59 fstat(199, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 [pid 1303248] 08:22:59 close(199) = 0 [pid 947310] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata", 0x7f683fb378b0) = -1 ENOENT (No such file or directory) [pid 947310] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata", 0x7f683fb37730) = -1 ENOENT (No such file or directory) [pid 947310] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", 0x7f683fb37730) = -1 ENOENT (No such file or directory) [pid 947310] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 [pid 947310] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 [pid 947310] 08:22:59 openat(AT_FDCWD, "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", O_WRONLY|O_CREAT|O_EXCL, 0666) = 148 [pid 947310] 08:22:59 fsync(148) = 0 [pid 947310] 08:22:59 close(148) = 0 [pid 947310] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", {st_mode=S_IFREG|0644, st_size=46265, ...}) = 0 [pid 947310] 08:22:59 rename("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/._metadata.inprogress.e87ac62b-5f75-472b-ad21-9579a872b0d0", "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1217/_metadata") = 0 [pid 947310] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe", <unfinished ...> [pid 947250] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata", <unfinished ...> [pid 947310] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=54409, ...}) = 0 [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFREG|0644, st_size=46265, ...}) = 0 [pid 947310] 08:22:59 unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/1a478755-43d1-4094-9283-db5e15fc0cbe" <unfinished ...> [pid 947250] 08:22:59 unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216/_metadata" <unfinished ...> [pid 947310] 08:22:59 <... unlink resumed>) = 0 [pid 947250] 08:22:59 <... unlink resumed>) = 0 [pid 947250] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 [pid 947250] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 [pid 947250] 08:22:59 openat(AT_FDCWD, "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 148 [pid 947250] 08:22:59 newfstatat(148, "", {st_mode=S_IFDIR|0755, st_size=4096, ...}, AT_EMPTY_PATH) = 0 [pid 947250] 08:22:59 close(148) = 0 [pid 947250] 08:22:59 stat("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", <unfinished ...> [pid 947201] 08:22:59 <... stat resumed>0x7f56f2069a20) = -1 ENOENT (No such file or directory) [pid 947250] 08:22:59 <... stat resumed>{st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0 [pid 947250] 08:22:59 openat(AT_FDCWD, "/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216", O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY <unfinished ...> [pid 947250] 08:22:59 <... openat resumed>) = 148 [pid 947250] 08:22:59 newfstatat(148, "", <unfinished ...> [pid 947250] 08:22:59 <... newfstatat resumed>{st_mode=S_IFDIR|0755, st_size=4096, ...}, AT_EMPTY_PATH) = 0 [pid 947250] 08:22:59 close(148) = 0 [pid 947250] 08:22:59 unlink("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = -1 EISDIR (Is a directory) [pid 947250] 08:22:59 rmdir("/opt/flink/statestore/d0346e6c26416765a6038fa482b29502/chk-1216") = 0
Calling "sync()" when closing the stream in "FsCheckpointStreamFactory::closeAndGetHandle" fixes the problem by syncing the serialized state files before returning their reference to the jobmanager. The following commit fixes this: https://github.com/Planet-X/flink/commit/0d6e25a9738d9d4ee94de94e1437f92611b50758
Diff is also attached.
"strace" confirms that "fsync" is now called before the taskmanager's state file is closed, see line 9:
[pid 108807] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0x7f2c167fc890) = -1 ENOENT (No such file or directory) [pid 108807] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0x7f2c167fc890) = -1 ENOENT (No such file or directory) [pid 108807] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc", {st_mode=S_IFDIR|0755, st_size=44, ...}) = 0 [pid 108807] 13:14:59 mkdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", 0777) = 0 [pid 110456] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222", 0x7f7d56efbe90) = -1 ENOENT (No such file or directory) [pid 110456] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 [pid 110456] 13:14:59 openat(AT_FDCWD, "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/9504fadb-182e-4812-857a-3dffa2408222", O_WRONLY|O_CREAT|O_TRUNC, 0666) = 268 [pid 110456] 13:14:59 fstat(268, {st_mode=S_IFREG|0644, st_size=0, ...}) = 0 [pid 110456] 13:14:59 fsync(268) = 0 [pid 110456] 13:14:59 close(268) = 0 [pid 108807] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata", 0x7f2c167fc710) = -1 ENOENT (No such file or directory) [pid 108807] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata", <unfinished ...> [pid 108807] 13:14:59 <... stat resumed>0x7f2c167fc670) = -1 ENOENT (No such file or directory) [pid 108807] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", 0x7f2c167fc670) = -1 ENOENT (No such file or directory) [pid 108807] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0 [pid 108807] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263", {st_mode=S_IFDIR|0755, st_size=72, ...}) = 0 [pid 108807] 13:14:59 openat(AT_FDCWD, "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", O_WRONLY|O_CREAT|O_EXCL, 0666) = 168 [pid 108807] 13:14:59 fsync(168) = 0 [pid 108807] 13:14:59 close(168) = 0 [pid 108807] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", {st_mode=S_IFREG|0644, st_size=21416, ...}) = 0 [pid 108807] 13:14:59 rename("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/._metadata.inprogress.7a1ea631-6dbd-4c7e-a551-849947c39396", "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-263/_metadata") = 0 [pid 108823] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata", {st_mode=S_IFREG|0644, st_size=36684, ...}) = 0 [pid 108823] 13:14:59 unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262/_metadata") = 0 [pid 108823] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 [pid 108823] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 [pid 108823] 13:14:59 openat(AT_FDCWD, "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, ...}, AT_EMPTY_PATH) = 0 [pid 108823] 13:14:59 close(168) = 0 [pid 108823] 13:14:59 stat("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", {st_mode=S_IFDIR|0755, st_size=0, ...}) = 0 [pid 108823] 13:14:59 openat(AT_FDCWD, "/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262", O_RDONLY|O_NONBLOCK|O_CLOEXEC|O_DIRECTORY) = 168 [pid 108823] 13:14:59 newfstatat(168, "", {st_mode=S_IFDIR|0755, st_size=0, ...}, AT_EMPTY_PATH) = 0 [pid 108823] 13:14:59 close(168) = 0 [pid 108823] 13:14:59 unlink("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = -1 EISDIR (Is a directory) [pid 108823] 13:14:59 rmdir("/opt/flink/statestore/61dc13952604177c452c114faec25afc/chk-262") = 0
Presumably only checkpoints with larger state sizes are affected as small state is inlined into the "_metadata" file, which is properly persisted since flink 1.19.1 due to FLINK-35217.