Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
Problem
From the birds eye view raft-to-storage flow looks similar to
RaftGroupService#run(writeCommand());
- Inner raft replication logic, when replicated on majority adjust raft.commitedIndex.
- Propagate command to RaftGroupListener (raft state machine).
RaftGroupListener#onWrite(closure(writeCommand()));
- Within state machine insert data from writeCommand to underneath storage:
var insertRes = storage.insert(cmd.getRow(), cmd.getTimestamp());
- ack that data was applied successfully
clo.result(insertRes);
- move raft.appliedIndex to corresponding value, meaning that the data for this index is applied to the state machine.
The most interesting part, especially for given ticket, relates to step 4.
In real world storage doesn't flush every mutator on disk, instead it buffers some amount of such mutators and flush them all-together as a part of some checkpointing process. Thus, if node fails before mutatorsBuffer.flush() it might lost some data because raft will apply data starting from appliedIndex + 1 on recovery.
Possible solutions:
There are several possibilities to solve this issue:
- In-storage WAL. Bad solution, because there's already raft log that can be used as a WAL. Such duplication is redundant.
- Local recovery starting from appliedIndex - mutatorsBuffer.size. Bad solution. Won't work for not-idempotent operations. Exposes inner storage details such as mutatorBuffer.size.
- proposedIndex propagation + checkpointIndex synchonization. Seems fine. More details below:
- First off all, in order to coordinate raft replicator and storage, proposedIndex should be propagated to raftGroupListener and storage.
- On every checkpoint, storage will persist corresponding proposed index as checkpointIndex.
- In case of storage inner checkpoints, storage won't notify raft replicator about new checkpointIndex. This kind of notification is an optimization that does not affect the correctness of the protocol.
- In case of outer checkpoint intention, e.g. raft snapshotting for the purposes of raft log truncation, corresponding checkpointIndex will be propagated to raft replicator within a callback "onShapshotDone".
- During local recovery raft will apply raft log entries from the very begging. If checkpointIndex occurred to be bigger than proposedIndex on an another raft log entity it fails the proposed closure with IndexMismatchException(checkpointIndex) that leads to proposedIndex shift and optional async raft log truncation.
Let's consider following example:
] checkpointBuffer = 3. [P] - perisisted entities, [!P] - not perisisted/in memory one.
- raft.put(k1,v1)
- -> raftlog[cmd(k1,v1, index:1)]
- -> storage[(k1,v1), index:1]
- -> appliedIndex:1
- raft.put(k2,v2)
- -> raftlog[cmd(k1,v1, index:1), \\{*}cmd(k2,v2, index:2)\\{*}]
- -> storage[(k1,v1), \\{*}(k2,v2)\\{*}, ** index:\\{*}2\\{*}]
- -> appliedIndex:2
- raft.put(k3,v3)
- -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), \\{*}cmd(k3,v3, index:3)\\{*}]
- -> storage[(k1,v1), (k2,v2), \\{*}(k3,v3)\\{*}, index:\\{*}3\\{*}]
- -> appliedIndex:3
- inner storage checkpoint
- raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3)]
- storage[(k1,v1, proposedIndex:1), (k2,v2, proposedIndex:2), (k3,v3, proposedIndex:3)]
- checkpointedData[(k1,v1),* *(k2,v2),* \\{*}(k3,v3), checkpointIndex:3\\{*}{*}\\{*}{*}]{}
- raft.put(k4,v4)
- -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3), \\{*}cmd(k4,v4, index:4)\\{*}]
- -> storage[(k1,v1), (k2,v2), (k3,v3), *(k4,v4)* index:\\{*}4\\{*}]
- -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
- -> appliedIndex:4
- Node failure
- Node restart
- StorageRecovery: storage.apply(checkpointedData)
- raft-to-storage data application starting from index: 1 // raft doesn't know checkpointedIndex at this point.
- -> storageResponse::IndexMismatchException(3)
- raft-to-storage data application starting from index: 3 + 1
- -> storageResponse::IndexMismatchException(3)
- Recovey result:
- -> raftlog[cmd(k1,v1, index:1), cmd(k2,v2, index:2), cmd(k3,v3, index:3), cmd(k4,v4, index:4)]
- -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
- -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), checkpointIndex:3]
- -> appliedIndex:4
- Raft log truncation
- storage.forceCheckpoint
- -> raftlog[index:4]
- -> storage[(k1,v1), (k2,v2), (k3,v3), (k4,v4) index:4]
- -> checkpointedData[(k1,v1), (k2,v2), (k3,v3), (k4,v4) checkpointIndex:4]
- -> appliedIndex:4
- storage.forceCheckpoint
Attachments
Attachments
Issue Links
- blocks
-
IGNITE-17302 Raft listener multi-storage support
- Resolved
- Dependent
-
IGNITE-17077 Implement checkpointIndex for PDS
- Resolved
-
IGNITE-17081 Implement checkpointIndex for RocksDB
- Resolved
- links to