Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-16907

Add ability to use Raft log as storage WAL within the scope of local recovery

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • 3.0.0-beta1
    • None

    Description

      Problem

      From the birds eye view raft-to-storage flow looks similar to

      1. RaftGroupService#run(writeCommand());
      2. Inner raft replication logic, when replicated on majority adjust raft.commitedIndex.
      3. Propagate command to RaftGroupListener (raft state machine).
        RaftGroupListener#onWrite(closure(writeCommand()));
      4. Within state machine insert data from writeCommand to underneath storage:  
        var insertRes = storage.insert(cmd.getRow(), cmd.getTimestamp());
      5. ack that data was applied successfully 
        clo.result(insertRes);
      6. move raft.appliedIndex to corresponding value, meaning that the data for this index is applied to the state machine.

      The most interesting part, especially for given ticket, relates to step 4.

      In real world storage doesn't flush every mutator on disk, instead it buffers some amount of such mutators and flush them all-together as a part of some checkpointing process. Thus, if node fails before mutatorsBuffer.flush() it might lost some data because raft will apply data starting from appliedIndex + 1 on recovery.

      Possible solutions:

      There are several possibilities to solve this issue:

      1. In-storage WAL. Bad solution, because there's already raft log that can be used as a WAL. Such duplication is redundant.
      2. Local recovery starting from appliedIndex - mutatorsBuffer.size. Bad solution. Won't work for not-idempotent operations. Exposes inner storage details such as mutatorBuffer.size.
      3. proposedIndex propagation + checkpointIndex synchonization. Seems fine. More details below:
      • First off all, in order to coordinate raft replicator and storage, proposedIndex should be propagated to raftGroupListener and storage.
      • On every checkpoint, storage will persist corresponding proposed index as checkpointIndex.
        • In case of storage inner checkpoints, storage won't notify raft replicator about new checkpointIndex. This kind of notification is an optimization that does not affect the correctness of the protocol.
        • In case of outer checkpoint intention, e.g. raft snapshotting for the purposes of raft log truncation, corresponding checkpointIndex will be propagated to raft replicator within a callback "onShapshotDone".
      • During local recovery raft will apply raft log entries from the very begging. If checkpointIndex occurred to be bigger than proposedIndex on an another raft log entity it fails the proposed closure with IndexMismatchException(checkpointIndex) that leads to proposedIndex shift and optional async raft log truncation.

      Let's consider following example:

      ] checkpointBuffer = 3. [P] - perisisted entities, [!P] - not perisisted/in memory one.

      1. raft.put(k1,v1)
        1. -> raftlog[cmd(k1,v1, index:1)]
        2. -> storage[(k1,v1), index:1]
        3. -> appliedIndex:1
      2. raft.put(k2,v2)
        1. -> raftlog[cmd(k1,v1, index:1), \\{*}cmd(k2,v2, index:2)\\{*}]
        2. -> storage[(k1,v1), \\{*}(k2,v2)\\{*}, ** index:\\{*}2\\{*}]
        3. -> appliedIndex:2
      3. raft.put(k3,v3)
        1. -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  \\{*}cmd(k3,v3, index:3)\\{*}]
        2. -> storage[(k1,v1), (k2,v2), \\{*}(k3,v3)\\{*}, index:\\{*}3\\{*}]
        3. -> appliedIndex:3
        4. inner storage checkpoint
          1. raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  cmd(k3,v3, index:3)]
          2. storage[(k1,v1, proposedIndex:1), (k2,v2, proposedIndex:2), (k3,v3, proposedIndex:3)]
          3. checkpointedData[(k1,v1),* *(k2,v2),* \\{*}(k3,v3), checkpointIndex:3\\{*}{*}\\{*}{*}]{}
      4. raft.put(k4,v4)
        1. -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  cmd(k3,v3, index:3), \\{*}cmd(k4,v4, index:4)\\{*}]
        2. -> storage[(k1,v1), (k2,v2), (k3,v3), *(k4,v4)* index:\\{*}4\\{*}]
        3. -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
        4. -> appliedIndex:4
      5. Node failure
      6. Node restart
        1. StorageRecovery: storage.apply(checkpointedData)
        2. raft-to-storage data application starting from index: 1 // raft doesn't know checkpointedIndex at this point.
          1. -> storageResponse::IndexMismatchException(3)
            1.  raft-to-storage data application starting from index: 3 + 1
      7. Recovey result:
        1. -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2),  cmd(k3,v3, index:3), cmd(k4,v4, index:4)]
        2. -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
        3. -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
        4. -> appliedIndex:4
      8. Raft log truncation
        1. storage.forceCheckpoint
          1. -> raftlog[index:4]
          2. -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
          3. -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), (k4,v4) checkpointIndex:4]
          4. -> appliedIndex:4

      Attachments

        Issue Links

          Activity

            People

              ibessonov Ivan Bessonov
              alapin Alexander Lapin
              Kirill Tkalenko Kirill Tkalenko
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 9.5h
                  9.5h