Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
-
None
Description
General context
According to tx design document new abstraction is introduced to encapsulate replication engine (e.g. Raft) from business logic, called primary replica:
A primary replica is a replica which serves a special purpose in the transaction protocol.Only one primary replica can exist at a time. Each replica is identified by liveness interval (startTs, endTs). All such intervals are disjoint, so the new primary replica liveness interval can’t overlap with the previous. Timestamps used for defining the intervals must be comparable with timestamps assigned to committing transactions. For example, HLC timestamps can be used for this purpose. Primary replica is used to execute CC protocol (so all reads and writes go through it), thus maintaining serializable executions, as described in the next section. The simplest implementation would be piggy-backing to RAFT protocol for tying a primary replica to a RAFT leader. See the leaseholder section from the RAFT paper for details. For this approach a RAFT leader is identical to a primary replica node. The endTs is constantly extended using RAFT heart beating. A primary replica’s status can be voluntarily transferred to another replica. This is only possible after its liveness interval expires. This can be useful, for example, for RAFT leaders balancing.
Besides obvious lease-based disjoint replication leader detection, primary replica is also responsible for handling messages acting as a storage and replication pre-and-post-processor. It's up to replica to
- acquire, release and await locks
- propagate requests to storage directly
- convert message to an appropriate replication(Raft) command and propagate it to the replication engine.
Let's check following example:
As-Is (currently):
// client-side InternalTable.upsert() enlistInTx() raftService.run(upsertCommand) raftGroupService.sendWithRetry(ActionRequest.of(upsertCommand)) messagingService().invoke(actionRequest) // server-side ActionRequestProcessor.handleRequest(actionRequest) future = JraftServerImpl.DelegatingStateMachine.getListener().onBeforeApply(request.command()); // Lock management future.handle(actionRequest.command() instanceof WriteCommand ? applyWrite(actionRequest) : applyRead(actionRequest))
Please pay attention to onBeforeApply step. It was introduced in order to manage(acquire) locks with further locks awaiting outside the raft. It is critical not to occupy the linearized in-raft execution with such lengthy operations as waiting for locks to be released.
It worth to mention, that such approach has several disadvantages, e.g. onBeforeApply step is executed before isLeader() check, so that, it might acquire lock on non-leader-node that is not the expected behavior.
To-Be (should be implemented):
// client-side InternalTable.upsert() enlistInTx() replicaService.invoke(upsertRequest, primary=true) // server-side Replica.handleRequest(actionRequest) if (actionRequest.isPrimaryEvaluationExpected()) checkLease(); // Return failure if not valid if (actionRequest instanceOf WriteRequest) { // validate writeRequest locally // acquire all locks !locally! fut = txManager.intentWriteLock(table); fut.handle(()-> return future.of(async(replicationEngine.replicate(ReplicationCommand.of(writeRequest)))) ) }
in other word:
- Instead of raftGroupService, replicaService should be used.
- ReplicaService uses messages (actionRequests) instead of raft commands.
- Within the scope of RW transactions replicaService always sends requests to the primary replica, however within the RO transactions non-primary replicas will also participate in requests handling, so that I believe we should introduce common Replica instead of strict PrimaryReplica.
- Replica is aware of requests handling logic and process actions in a way similar to raftGroupListener does, that means that in addition to RaftGroupListener, ReplicaListener (personally I don't like term Listener here) will be introduced.
TODO
Within the scope of given ticket interfaces for Replica, ReplicaService and ReplicaListener should be introduced. Given classes should be sufficient to replace direct interaction with raft. Please pay attention that from the service point of view we will need methods for
- Dedicated replica instance communication, e.g. invoke should have overloaded version with nodeId parameter. Not sure whether it'll be nodeId itself or peerId or some other replicaId locator.
- Primary replica detection.
Attachments
Attachments
Issue Links
- blocks
-
IGNITE-17378 Check the replica is a primary before processing request at Replica
- Resolved
-
IGNITE-17375 Add description to replication module
- Open
-
IGNITE-17255 Implement ReplicaService
- Resolved
-
IGNITE-17257 Implement Replica server-side logic
- Resolved
-
IGNITE-17258 Implement ReplicaListener
- Resolved
-
IGNITE-17056 Design rebalance cancel mechanism
- Resolved
- links to