Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-49374

RocksDB State Store Checkpoint Structure V2

    XMLWordPrintableJSON

Details

    • Epic
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 4.0.0
    • None
    • Structured Streaming
    • None
    • RocksDB State Store Checkpoint Structure V2

    Description

      Design Doc: https://docs.google.com/document/d/1uWRMbN927cRXhSm5oeV3pbwb6o73am4r1ckEJDhAHa0/edit?usp=sharing

      Motivation

      We expect the new checkpoint structure would be beneficial by establishing characteristics of linear dependency between batch versions. Right now, tasks can be executed multiple times for the same batchID (for speculative execution or rerunning in ForEachBatch), and there can be multiple parallel lineages of state stores going on. For example, in one of the issue with ForEachBatch showed this lineage, which triggered a RocksDB file uploading bug:

      Although we fixed all the bugs, this complexity always makes the system prone to bugs. This non-linear lineage also presents a correctness risk, when some of the Version is changelogs. 

      In the same example, suppose Version 17 is a Snapshot checkpoint and Version 18 is a changelog checkpoint. When we need to recover from a checkpoint, we need to apply Version 17 and Version 18 together. However, Version 18 isn’t generated on top of Version 17. This can happen either because Version 18 is generated by a different worker from Version 17, or the same worker abandoned Version 17, replay this batch and generated Version 17’. In most cases, it is accurate, but we have identified some edge cases where users might be surprised by the results, and there may already be correctness issues. These correctness issues will become more prominent with transformWithState due to the occurrence of partial updates. (See Appendix for examples). This issue is not specific to RocksDB State Store but is a general state store problem, although the scope of the project is only limited to RocksDB State Store. Note that the fixing state store lineage only makes sure the state store is consistent, and doesn’t make sure the state store is consistent with outputs to the sink.

      Furthermore, the complex checkpoint version lineage makes it hard to reduce overhead for old version cleanup. Even a long running state store has to read metadata for all previous versions and list all files to do any cleanup safely, which is expensive. It is necessary because any version can be written by a different executor and references to RocksDB files that the current executor isn’t aware of. In extreme cases, we can even corrupt the state store. The chance that it happens is very low, but it’s not a good idea to leave unknown correctness risk unfixed.

      Proposal sketch

      The proposed checkpoint structure will ensure a linear dependency:

      This stronger guarantee will be a good foundation for the problems above.

       

      The proposal’s basic idea is to guarantee linear lineage by not allowing checkpoint overwriting. All checkpoints are made with a new file name with a uniqueID. When starting any batch, a task precisely identifies which checkpoint to load with the uniqueID.

      When a new state store checkpoint is generated, the checkpoint path name includes a globally unique ID, so that it can never be updated. Here is an example:
      20_d8e2ca47.zip

      21_ef6618c2.delta

      21_f4d05ac9.delta

      22_4489578d.delta

      The name is stored in the commit log too. When the next batch is being executed, those unique IDs are passed to the executors, where they make sure they start to execute from this checkpoint. If the local state store isn’t in this state, it will download the checkpoint from the cloud.

      Part II: Detailed design

      Architecture

      Previously, a state store checkpoint was stored in a path that was only determined by checkpoint root path, operatorID, partitionID and batchID. When a stateful operator is executed, it is always able to construct the path with that information. It will download the checkpoint path from the path of the previous batchID and checkpoint to the path for this batchID. As mentioned earlier, this flexibility comes with a cost: there is no way to distinguish among checkpoints generated by different tasks rerunning for the same batchID.

       

      In this new design, every checkpoint will go to a globally unique file. The globally unique ID needs to be communicated between driver and the executors, and stored in commit logs. The basic workflow is shown as following:

      File Structure Under Checkpoint Path

      Currently, a checkpoint is stored in path <checkpoint_root>/<operatorID>/<partitionID>/<storeName>/<batchId>.[changelog, zip]. The path structure will look like following:

       __ 0 (operator ID)

          ----

           | 0 (partitionID)

           -----

           |     ……

           | 1 (partitionID)

           -----

           |          |- default (storeName)

           |         -----

           |                     |  20.zip

           |                     |  21.delta

           |                     |  22.delta

           |                     +  23.delta

           | 2 (partitionID)

          +--- ……

       

      The general structure will be intact, and we only change how files in the root path. For example, in the example above, we will only change file names in the file names in blue color. Instead of naming the files <batchID>.zip or <batchID>.delta, we will name it <batchID><uniqueID>.zip_ or <batchID><uniqueID>.delta{_}. It also means that for the same batchID, there could be multiple files. Here is an example:

      20_d8e2ca47.zip

      21_ef6618c2.delta

      21_f4d05ac9.delta

      22_4489578d.delta

      23_689aa6bd.delta

      The randomID will be generated by the executor itself. In the first version, it is just an UUID, which can be shortened later if needed. In the Appendix, we will discuss a design decision on how to generate these random IDs. These random numbers will be persistent in commit logs. When a new batch starts, these randomIDs will be passed to executors as a part of the operator executors. By the end of the batch, the IDs for the new checkpoints will be passed to the driver through an accumulator, where drivers can persist to the commit logs.

       

      The lineage information is always managed in commit logs, but the lineage is often needed in state store level when they download checkpoint or do cleanup. To make a state store self-contained, some lineage information is also stored in the .zip and .delta files themselves, so that in the level of a single state store, we can always finish all the operators without relying on outside information. This decision will be discussed in the appendix. Each delta file will contain a unique checkpoint ID since all (presumed) snapshot checkpoints, and all changelog checkpoints following it. Will discuss it in more detail.

       

      There are still extra complexities to this problem and we will discuss those issues in the following sections.
      1. Recovery Case. The driver will tell executors which version to use, but how do executors find old snapshots and deltas to use before this version? 

      1. Cleanup old versions.

      Commit Log Format Change

      We will add a new field to commit message CommitMetadata, and add a field, which is a Map[Map[Seq[String]]], which represents operatorID→storeName→partitionID→checkpointUniqueID. 

      Checkpoint File Format Change

      In both changelog file and zip file, extra information on uniqueIDs since the presumed last snapshot will be written. It is a presumed snapshot because sometimes snapshotting can fail or be delayed so what in place of the presumed last snapshot is only a changelog. In those cases, we can still reconstruct further lineage by reading from that changelog file from the presumed snapshot. By keeping reading lineage from previous snapshots, the whole lineage can always be re-constructed.

       

      When we need to download a checkpoint, we need to find the latest snapshot checkpoint and all subsequence changelog files. Usually, this can be done by only reading the delta file corresponding to the target unique checkpoint ID. In the cases of snapshot uploading failure, we will need to keep reading more than one delta file.

       

      In the delta files, we will create a new entry type LINEAGE_RECORD, which contains a list of unique_IDs. These IDs represent unique IDs for version-1, version-2, etc.

       

      In the zip files, this list will be added to class RocksDBCheckpointMetadata.

       

      The information should always be available, as

      1. When initially loading a state store, we always have lineage information loaded since the last snapshot.
      2. When we add a new changelog checkpoint, we add one checkpoint ID to the list and it is still the full list.
      3. We can prune the list saved in memory based on presumed snapshot versions.

       

      An alternative to change checkpoint file format is for the state store to read commit log files to get the lineage, which has shortcomes: 1. it has to read multiple commit logs which can be slow; 2. State store checkpoint directory won’t be self contained and the state store has to understand the commit log. 

      Passing UniqueID from Driver to Executors

      We will add a field in class StatefulOperatorStateInfo to indicate uniqueID to use. The field will be an array of strings. Each string is corresponding to a uniqueID for one partition. This struct is already serialized to the executors.

      Passing UniqueID to Driver

      The uniqueID will be passed back in a similar approach as EventTimeWatermarkExec, as well as Marlin’s end offset.  StatefulOperator will have an optional accumulator, with which we receive all the uniqueIDs from all tasks. In some cases, we may receive more than one uniqueID for one partition. The driver can simply pick any of them as the one to commit. Note that it never happens when the executors pass Marlin’s end offset back, as only one attempt will be made. In microbatch mode, some  of the IDs might be from failed tasks, and we might pick it. It should still be correct as even if the task fails, the checkpoint should still be valid to use. One potential problem is that the checkpoint we use may not match the output used in the sink. It is not a problem for now, as it happens today anyway without the problem. We can revisit the decision if we see such a problem in the future.

       

      We will pass back not just a uniqueID for the new checkpoint based on V, but also the unique ID used for V-1. This will help the driver to validate that lineage is as expected. 

      State Store Checkpointing

      When state store checkpointing starts, a unique ID (UUID) is generated, and stored in the state store provider. This ID is used to construct delta and zip file names for the checkpoint. The checkpoint procedure is mostly the same as today. The only thing is that we need to preserve the unique ID and the lineage information and make sure we use the correct one. Since the snapshot checkpoint is done asynchronously, we need to make an appropriate copy to them to make sure they are consistent with the snapshot.

       

      Checkpointing would look like following:
      20_d8e2ca47.delta (lineage in file: …)

      20_d8e2ca47.zip (lineage in file: …)

      21_ef6618c2.delta (lineage in file: d8e2ca47)

      21_f4d05ac9.delta (lineage in file: d8e2ca47)

      22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)

      23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)

      Assuming a snapshot checkpointing is scheduled too, so later snapshotting will succeed and the snapshot file will show up:
      20_d8e2ca47.delta (lineage in file: …)

      20_d8e2ca47.zip (lineage in file: …)

      21_ef6618c2.delta (lineage in file: d8e2ca47)

      21_f4d05ac9.delta (lineage in file: d8e2ca47)

      22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)

      23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)

      23_689aa6bd.zip (lineage in file: d8e2ca47, f4d05ac9, 4489578d)

      It is possible that 23{}689aa6bd{}.zip{_} fails to be uploaded. In this case, we will stay with a delta file only and continue from there.

      The following delta files may only contain lineage up to Version, but we can further trace back to version 20 by reading 23{}689aa6bd{}.delta{_}:

      20_d8e2ca47.delta (lineage in file: …)

      20_d8e2ca47.zip (lineage in file: …)

      21_ef6618c2.delta (lineage in file: d8e2ca47)

      21_f4d05ac9.delta (lineage in file: d8e2ca47)

      22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

      23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

      24_32e3cc2a.delta (lineage in file: 689aa6bd)

      It is possible that we have two executors doing a full snapshot. The one picked up for lineage failed but the other one succeeded. In an example, (Version=23, ID=8205c96f) got a full snapshot checkpoint. It will generate a result like this:

      20_d8e2ca47.delta (lineage in file: …)

      20_d8e2ca47.zip (lineage in file: …)

      21_ef6618c2.delta (lineage in file: d8e2ca47)

      21_f4d05ac9.delta (lineage in file: d8e2ca47)

      22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

      23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

      23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

      23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

      24_32e3cc2a.delta (lineage in file: 689aa6bd)

      But since 23_8205c96f is never referenced by lineage in either commit log or checkpoint files, such as 24_32e3cc2a, they will be ignored, and for version 23, 23_689aa6bd.delta still is used.

       

      Note that there is a chance where the same executor executed the same partition of the same batchID twice and both were successful. This is common in the case of ForEachBatch. We need to make sure we name the snapshot correctly. Here is an example:

      1. Executed version 23, picked uniqueID 689aa6bd and successfully uploaded file 23_689aa6bd.delta. 
      2. Make RocksDB checkpoint Ckp1.
      3. Return to the driver, and 23_689aa6bd is chosen to be committed.
      4. Executed version 23, picked uniqueID 8205c96f and successfully uploaded file 23_8205c96f.delta. 
      5. Make RocksDB checkpoint Ckp2.
      6. Maintenance thread wakes up for snapshot checkpoints.

      The maintenance thread needs to upload two snapshots. CKp1 →  23_689aa6bd.zip, as well as Ckp2 → 23_8205c96f.zip. Both need to be uploaded as it doesn’t know which one is the one to be committed to the commit log. If it always uploads one, there is a chance that it always picks up the wrong one and we don’t have snapshot checkpointing for a long time, or ever.

      One potential optimization is for the snapshot to wait a little bit for the next batch to start, so that it knows it is 23_689aa6bd that is picked up, so that it can only upload 23_689aa6bd.zip and skip  23_8205c96f.delta. If the next batch doesn’t start timely or ever, it will upload both.

      State Store Load

      As normal cases, most of the time, executors already have the open state store for the one to load. The executor just needs to validate the open state store is at the checkpoint ID that matches the one sent from the driver. If the ID matches, we already have the state store loaded. If not, we need to reload from the cloud storage.

       

      When we load a checkpoint from the state store, we first construct checkpoint names <batchID><uniqueID>.zip_ and <batchID><uniqueID>.delta{_}. If the former file exists, we just load it. Otherwise, we load the former name, read the first record, which is supposed to be the lineage metadata. Using the metadata, we can download the last snapshot version and apply subsequent delta files.

       

      Example 1. The uniqueID has a snapshot.

      In this example, we are trying to load (version=23, uniqueID=689aa6bd). 

      20_d8e2ca47.delta

      20_d8e2ca47.zip

      21_ef6618c2.delta

      21_f4d05ac9.delta

      22_4489578d.delta

      23_689aa6bd.delta

      23_689aa6bd.zip

      We see there is a file 23_689aa6bd.zip, so we just use the file.

       

      Example 2. The uniqueID only contains a delta log file.

      If we still load (version=23, uiqueID=689aa6bd), but there is only a delta file:
      20_d8e2ca47.delta (lineage in file: …)

      20_d8e2ca47.zip (lineage in file: …)

      21_ef6618c2.delta (lineage in file: d8e2ca47)

      21_f4d05ac9.delta (lineage in file: d8e2ca47)

      22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

      23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

      case, we will read lineage from 23_689aa6bd.delta and apply these files: 20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta. The outcome is the same even if there is snapshot file for the same version but with different checkpoint:

      20_d8e2ca47.delta (lineage in file: …)

      20_d8e2ca47.zip (lineage in file: …)

      21_ef6618c2.delta (lineage in file: d8e2ca47)

      21_f4d05ac9.delta (lineage in file: d8e2ca47)

      22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

      23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

      23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

      23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

      In this case, ** 23_8205c96f.zip and 23_8205c96f.delta are simply ignored.

       

      Example 3. Can’t find snapshot file in presumed snapshot location.

      In the case where snapshot uploading fails, we will need to continue tracing the lineage. In following example,

      20_d8e2ca47.delta (lineage in file: …)

      20_d8e2ca47.zip (lineage in file: …)

      21_ef6618c2.delta (lineage in file: d8e2ca47)

      21_f4d05ac9.delta (lineage in file: d8e2ca47)

      22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

      23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

      24_32e3cc2a.delta (lineage in file: 689aa6bd)

      When we are loading (version=24, ID=32e3cc2a), we only have lineage up to 23_689aa6bd. However, there is no file 23_689aa6bd.zip. In this case, we will open 23_689aa6bd.delta, and trace back files. Eventually, we will need to apply following files: 20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta,24_32e3cc2a.

      Note that, even if snapshot file shows up for a uniqueID, we will ignored them, and use the same 

      Compatibility

      The new format will not be consumed by previous releases. So the feature will be turned off by default for microbatch mode. Later we will switch the default to be on.

       

      Within the same release, we will allow users to switch between the V1 and V2. The driver will read the commit log and see whether uniqueID is available or not. If it is available, it is sent to the executors so that they can load checkpoints accordingly. Otherwise, it will assume it is V1. When switching mode, we will need to make sure that the first checkpoint is a full snapshot checkpoint synchronously. In this case, loading a checkpoint only needs to deal with files generated by either V1 or V2, not a mixture of the two.

       

      Appendix: Data Correctness Issue When Replaying Unmatches changelog files

      Here is an example where users are not able to achieve what they want. Consider such a query:

      ……
      .withColumn("random_key", (rand() * 1024).cast("int"))

      .groupBy("random_key")

         .agg(collect_list($"payload").as("payload"))

      ……

      All the query does is randomly distribute an entry to one of the 1024 buckets, and inside a bucket, entries are collected together. Consider such an execution:

      Batch 1:
        Input: foo
        Task1 distributes it to key 6, so state store state: (6→ foo), we checkpoint it to 1.snapshot
        Task2 distributes it to key 8, so state store state: (8 →foo), but checkpoint finishes and the checkpoint is lost

       

      Batch 2:
        Input: bar
        The only task starts with what is left over by Task2 (8→foo), and distributed the entry to key 6, so it has a delta state store update (6→bar), and successfully checkpoint to 2.delta

      Query Restart:
        The query is restarted, and replay 1.snapshot + 2.delta. The outcome is (6→bar). “Foo” is lost.

       

      The issue is more prominent with transformWithSate. Here is an example:

       

      In transformWithState, the user writes the UDF to sample exactly 3 elements from the key group. And this can happen:
      Batch n:

      At Batch n Start, they have elements (A, B, C)

      In Batch n, element D comes

      Task 1 randomly drops A, checkpointing as snapshot (B, C, D); Task 2 randomly drops B, checkpointing as snapshot (A, C, D), Task 2's checkpointing wins, leaving (A, C, D) as the official checkpoint;

      Batch n+1:

      In Batch n+1, element E comes.

      Task 1 continue its processing against its local state store, dropping B, checkpointing as changelog (add E, remove B)

       

      Query Restart:

      Now if the query restarts and recovers from checkpoints, we would apply (A, C, D) + (add E, remove B). There is actually no B. Assuming the behavior is that we can do blind B removing, we will leave (A, C, D, E). The user has the UDF to sample 3 elements, and they got 4.

      Attachments

        1. image-2024-08-23-14-30-22-474.png
          46 kB
          Siying Dong
        2. image-2024-08-23-14-29-59-165.png
          20 kB
          Siying Dong
        3. image-2024-08-23-14-29-19-443.png
          75 kB
          Siying Dong
        4. image-2024-08-23-14-28-56-418.png
          40 kB
          Siying Dong

        Activity

          People

            Unassigned Unassigned
            siying Siying Dong
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: