Uploaded image for project: 'Ignite'
  1. Ignite
  2. IGNITE-16668

Design in-memory raft group reconfiguration on node failure

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • None
    • None
    • None

    Description

      If a node storing a partition of an in-memory table fails and leaves the cluster all data it had is lost. From the point of view of the partition it looks like as the node is left forever.

      Although Raft protocol tolerates leaving some amount of nodes composing Raft group (partition); for in-memory caches we cannot restore replica factor because of in-memory nature of the table.

      It means that we need to detect failures of each node owning a partition and recalculate assignments for the table without keeping replica factor.

      Upd 1:

      Problem

      By design raft has several persisted segments, e.g. raft meta (term/committedIndex) and stable raft log. So, by converting common raft to in-memory one it’s possible to break some of it’s invariants. For example Node C could vote for Candidate A before self-restart and vote then for Candidate B after one. As a result two leaders will be elected which is illegal.
       

       

      Solution

      In order to solve the problem mentioned above it’s possible to remove and then return back the restarting node from the peers of the corresponding raft group. The peer-removal process should be finished before the restarting of the corresponding raft server node.
       
       
       
      The process of removing and then returning back the restarting node is however itself tricky. And to answer why it’s non-trivial action, it’s necessary to reveal the main ideas of the rebalance protocol.

      Reconfiguration of the raft group - is a process driven by the fact of changing the assignments. Each partition has three corresponding sets of assignments stored in the metastore:

      1. assignments.stable - current distribution
      1. assignments.pending - partition distribution for an ongoing rebalance if any
      1. assignments.planned - in some cases it’s not possible to cancel or merge pending rebalance with new one. In that case newly calculated assignments will be stored explicitly with corresponding assignments.planned key. It's worth noting that it doesn't make sense to keep more than one planned rebalance. Any new scheduled one will overwrite already existing.

      However such idea of overwriting the assignments.planned key wont work within the context of an in-memory raft restart, because it’s not valid to overwrite the reduction of assignments. Let's illustrate this problem with the following example.

      1. In-memory partition p1 is hosted on nodes A, B and C, meaning that p1.assignments.stable=[A,B,C]
      1. Let's say that the baseline was changed, resulting in a rebalance on assignments.pending=[A,B,C,*D*]
      1. During the non-cancelable phase of [A,B,C]->[A,B,C,D], node C fails and returns back, meaning that we should plan [A,B,D] and [A,B,C,D] assignments. Both must be recorded in the only assignments.planned key meaning that [A,B,C,D] will overwrite reduction [A,B,D], so no actual raft reconfiguration will take place, which is not acceptable.

      In order to overcome given issue, let’s introduce two new keys assignments.switch.reduce that will hold nodes that should be removed and assignments.switch.append that will hold nodes that should be returned back and run following actions:

      On in-memory partition restart (or on partition start with cleaned-up PDS)

      within retry loop add current node to assignments.switch.reduce set:

      do {
           retrievedAssignmentsSwitchReduce = metastorage.read(assignments.switch.reduce);
           calculatedAssignmetnsSwitchReduce = union(retrievedAssignmentsSwitchReduce.value, currentNode);
      
           if (retrievedAssignmentsSwitchReduce.isEmpty()) {
               invokeRes = metastoreInvoke:
                   if empty(assignments.switch.reduce)
                       assignments.switch.reduce = calculatedAssignmentsSwitchReduce 
           } else {
               invokeRes = metastoreInvoke:
                   eq(revision(assignments.switch.reduce), retrievedAssignmentsSwitchReduce.revision)
                       assignments.switch.reduce = calculatedAssignmentsSwitchReduce 
           }
      } while (!invokeRes);
      On assignments.switch.reduce change on corresponding partition leader

      Within watch listener on assignments.switch.reduce key on corresponding partition leader we trigger new rebalance if there is no pending one.

      calculatedAssignments = substract(calcPartAssighments(), assignments.switch.reduce);
      
      metastoreInvoke:
          if empty(partition.assignments.change.trigger.revision) || partition.assignments.change.trigger.revision < event.revision
              if empty(assignments.pending)
                  assignments.pending = calculatedAssignments
                  partition.assignments.change.trigger.revision = event.revision
      
      On rebalance done

      changePeers() calles onRebalanceDone closure with set of newPeers.

      do {
          retrievedAssignments[Stable, Pending, SwitchReduce, SwitchAppend, Planned] = readRebalanceKeys();
          resolvedStable = resoveClusterNodes(newPeers);
          
          // Were reduced
          reducedNodes = substract(retrievedAssignmetnsSwitchReduce, resolvedStable);
      
          // Weere added
          addedNodes = substract(resolvedStable, retrievedAssignmentsStable);
          
          // For futher reduction
          calculatedAssignmetnsSwitchReduce = substact(retrievedAssignmetnsSwitchReduce, reducedNodes);
      
          // For futher addition
          calculatedAssignmetnsSwitchAppend = union(retrievedAssignmetnsSwitchAppend, reducedNodes);
          calculatedAssignmentsSwitchAppend = substact(calculatedAssignmentsSwitchAppend, addedNodes)
          calculatedAssignmetnsSwitchAppend = intersect(calcPartAssighments(), calculatedAssignmentsSwitchAppend)
      
          calcualtedPedingReducion = substract(resolvedStable, retrievedAssignmetnsSwitchReduce);
      
          calculatedPendingAddition = union(resolvedStable, reducedNodes)
          calculatedPendingAddition = intersect(calcPartAssighments(), calculatedPendingAddition)
          
          // retry preconditions should be checked inside every invoke.
          <retryPreconditions>
             eq(revision(assignemnts.stable), retrievedAssignmentsStable.revision) and
             eq(revision(assignemnts.pending), retrievedAssignmentsPeding.revision) and
             eq(revision(assignemnts.switch.reduce), retrievedAssignmentsSwitchReduce.revision) and
             eq(revision(assignemnts.switch.append), retrievedAssignmentsSwitchAppend.revision)
          </retryPreconditions>
          
          // There are nodes that should be returned back
          if (!empty(calculatedAssignmetnsSwitchAppend)) {
              invokeRes = metastoreInvoke:
                  if (<inline retryPreconditions>)
                       assignemnts.stable = resolvedStable
                       assignemnts.pending = calculatedPendingAddition
                       assignemnts.switch.reduce = calculatedAssignmetnsSwitchReduce
                       assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
          } else if (!empty(calculatedAssignmetnsSwitchReduce) {
               invokeRes = metastoreInvoke:
                   if (<inline retry precondions>)
                      assignemnts.stable = resolvedStable
                      assignemnts.pending = calculatedPendingReduction
                      assignemnts.switch.reduce = calculatedAssignmetnsSwitchReduce
                      assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
          } else {
              if (<inline retryPreconditions>)
                  // Common rebalance actions with moving planned to penidng if any...
                  + 
                  assignemnts.switch.append = calculatedAssignmetnsSwitchAppend
           }
      } while (!invokeRes);

       It’s also possible to update onCommonRebalance in a similar way, however it’s only an optimization because onRebalanceDone will always consider switch as a higher priority action in comparison to planned rebalance triggered by baseline change or replica factor change.

      Besides that, in many cases, especially for in-memory partitions, it is worth removing leaving node from raft group peers not within its restart but during corresponding networkTopology.onDisappeared() event. However It’s only an optimization. E.g. in the case of persisted table with cleaned up PDS it’s not known whether it was cleaned up until node self-check on table start up. Thus, the correctness of the algorithm is achieved due to the actions taken exclusively when the node is restarted - the rest is optimization.

      All in all general flow on node restart will look like following:

      private void onParitionStart(Partition p) {
              if (p.table().inMememory() || !p.localStorage().isAvailable()) {
                  if (p.isMajorityAvailable()) {
                          <inline metaStorageInvoke*>
                          // Do not start corresponding raft server!
                  } else if (fullParitionRestart) {
                          commonStart();
                  } else {
                          // Do nothing.
                  }
              } else {
                  commonStart();
              }
      } 

      Attachments

        1. Screenshot from 2022-04-19 11-11-05.png
          24 kB
          Alexander Lapin
        2. Screenshot from 2022-04-19 11-12-55.png
          29 kB
          Alexander Lapin
        3. Screenshot from 2022-04-19 11-12-55-1.png
          29 kB
          Alexander Lapin

        Issue Links

          Activity

            People

              alapin Alexander Lapin
              sergeychugunov Sergey Chugunov
              Kirill Gusakov Kirill Gusakov
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: