Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-16907

Add ability to use Raft log as storage WAL within the scope of local recovery

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 3.0.0-beta1
    • None

    Description

      Problem

      From the birds eye view raft-to-storage flow looks similar to

      1. RaftGroupService#run(writeCommand());
      2. Inner raft replication logic, when replicated on majority adjust raft.commitedIndex.
      3. Propagate command to RaftGroupListener (raft state machine).
        RaftGroupListener#onWrite(closure(writeCommand()));
      4. Within state machine insert data from writeCommand to underneath storage:  
        var insertRes = storage.insert(cmd.getRow(), cmd.getTimestamp());
      5. ack that data was applied successfully 
        clo.result(insertRes);
      6. move raft.appliedIndex to corresponding value, meaning that the data for this index is applied to the state machine.

      The most interesting part, especially for given ticket, relates to step 4.

      In real world storage doesn't flush every mutator on disk, instead it buffers some amount of such mutators and flush them all-together as a part of some checkpointing process. Thus, if node fails before mutatorsBuffer.flush() it might lost some data because raft will apply data starting from appliedIndex + 1 on recovery.

      Possible solutions:

      There are several possibilities to solve this issue:

      1. In-storage WAL. Bad solution, because there's already raft log that can be used as a WAL. Such duplication is redundant.
      2. Local recovery starting from appliedIndex - mutatorsBuffer.size. Bad solution. Won't work for not-idempotent operations. Exposes inner storage details such as mutatorBuffer.size.
      3. proposedIndex propagation + checkpointIndex synchonization. Seems fine. More details below:
      • First off all, in order to coordinate raft replicator and storage, proposedIndex should be propagated to raftGroupListener and storage.
      • On every checkpoint, storage will persist corresponding proposed index as checkpointIndex.
        • In case of storage inner checkpoints, storage won't notify raft replicator about new checkpointIndex. This kind of notification is an optimization that does not affect the correctness of the protocol.
        • In case of outer checkpoint intention, e.g. raft snapshotting for the purposes of raft log truncation, corresponding checkpointIndex will be propagated to raft replicator within a callback "onShapshotDone".
      • During local recovery raft will apply raft log entries from the very begging. If checkpointIndex occurred to be bigger than proposedIndex on an another raft log entity it fails the proposed closure with IndexMismatchException(checkpointIndex) that leads to proposedIndex shift and optional async raft log truncation.

      Let's consider following example:

      ] checkpointBuffer = 3. [P] - perisisted entities, [!P] - not perisisted/in memory one.

      1. raft.put(k1,v1)
        1. -> raftlog[cmd(k1,v1, index:1)]
        2. -> storage[(k1,v1), index:1]
        3. -> appliedIndex:1
      2. raft.put(k2,v2)
        1. -> raftlog[cmd(k1,v1, index:1), \\{*}cmd(k2,v2, index:2)\\{*}]
        2. -> storage[(k1,v1), \\{*}(k2,v2)\\{*}, ** index:\\{*}2\\{*}]
        3. -> appliedIndex:2
      3. raft.put(k3,v3)
        1. -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  \\{*}cmd(k3,v3, index:3)\\{*}]
        2. -> storage[(k1,v1), (k2,v2), \\{*}(k3,v3)\\{*}, index:\\{*}3\\{*}]
        3. -> appliedIndex:3
        4. inner storage checkpoint
          1. raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  cmd(k3,v3, index:3)]
          2. storage[(k1,v1, proposedIndex:1), (k2,v2, proposedIndex:2), (k3,v3, proposedIndex:3)]
          3. checkpointedData[(k1,v1),* *(k2,v2),* \\{*}(k3,v3), checkpointIndex:3\\{*}{*}\\{*}{*}]{}
      4. raft.put(k4,v4)
        1. -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  cmd(k3,v3, index:3), \\{*}cmd(k4,v4, index:4)\\{*}]
        2. -> storage[(k1,v1), (k2,v2), (k3,v3), *(k4,v4)* index:\\{*}4\\{*}]
        3. -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
        4. -> appliedIndex:4
      5. Node failure
      6. Node restart
        1. StorageRecovery: storage.apply(checkpointedData)
        2. raft-to-storage data application starting from index: 1 // raft doesn't know checkpointedIndex at this point.
          1. -> storageResponse::IndexMismatchException(3)
            1.  raft-to-storage data application starting from index: 3 + 1
      7. Recovey result:
        1. -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  cmd(k3,v3, index:3), cmd(k4,v4, index:4)]
        2. -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
        3. -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
        4. -> appliedIndex:4
      8. Raft log truncation
        1. storage.forceCheckpoint
          1. -> raftlog[index:4]
          2. -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
          3. -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), (k4,v4) checkpointIndex:4]
          4. -> appliedIndex:4

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            ibessonov Ivan Bessonov
            alapin Alexander Lapin
            Kirill Tkalenko Kirill Tkalenko
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 9.5h
                9.5h

                Slack

                  Issue deployment