Uploaded image for project: 'Cassandra'
  1. Cassandra
  2. CASSANDRA-16909

☂ Medium Term Repair Improvements



    • Epic
    • Status: In Progress
    • Normal
    • Resolution: Unresolved
    • None
    • Consistency/Repair
    • None
    • ☂ Medium Term Repair Improvements
    • Operability
    • Normal
    • All
    • None


      This is to track the repair improvement works defined on the dev list.

      Email was: [DISCUSS] Repair Improvement Proposal

      This JIRA will track the related tasks and be used for documentaiton

      Repair Coordinator State
      1. ActiveRepairService.recordRepairStatus
          1. Maps JMX command (int) to (status, msg)
      2. Get/validate ColumnFamilies
          1. Skip if no ColumnFamilies to repair
      3. Get replicas for ranges
          1. Ignore ranges not covered by this instance IFF --ignore-unreplicated-keyspaces, else Fail
          2. Skip if no ranges to repair
          3. If --force filter out non-alive (FailureDetector.isAlive) participants
      4. [Not PREVIEW]Update SystemDistributedKeyspace's parent_repair_history
      5. ActiveRepairService.prepareForRepair
          1. TODO Should this be under PREPARE or still part of validation?
              1. If CompactionsPendingThreshold triggered, Fail
              2. registerParentRepairSession - update map of UUID -> ParentRepairSession
          2. Send PREPARE_MSG awaiting responses [possible failures: timeout waiting, participate failure]
              1. [improvement] PREPARE_MSG should be idempotent and if no reply within X, retry Y times
              2. Known Failures; all are retryable at this stage
                  1. Timeout
                  2. Participate goes from alive to down
                  3. CompactionsPendingThreshold triggered
                      1. Not included in org.apache.cassandra.exceptions.RequestFailureReason#forException, so looks the same as Unexpected error
                          1. If updated and in mixed-mode, the update falls back to UNKNOWN, which then matches the unexpected error behavior
                  4. Unexpected error (this removes the session)
      6. Run RepairTask (normal, IR, preview); see coordinator state for each type
      7. On Success
          1. Update SystemDistributedKeyspace.parent_repair_history to show the successful ranges
          2. If any sub-session failed, fail the job
          3. ActiveRepairService.cleanUp - send CLEANUP_MSG to all participates to clean up
              1. TODO: why is this only on success and not failures as well?
              2. [improvement] - does not wait for ACK (though it is sent), we log it async if we get it (else timeout)
      8. On Exception
          1. fail
      Normal/Preview Repair Coordinator State
      1. For each common range
          1. ActiveRepairService.submitRepairSession
              1. Creates/run a RepairSession for each CommonRange
          2. Once all RepairSessions done
              1. [not consistent cross each type] handle session errors
              2. [Preview Repair] check SyncStatSummary for difference and send a human readable notification 
      Incremental Repair Coordinator State
      1. org.apache.cassandra.repair.consistent.CoordinatorSessions#registerSession to create CoordinatorSession
      2. CoordinatorSession.execute
          2. Await all success
          3. Trigger Normal Repair (see "Normal/Preview Repair Coordinator State")
          5. Await all success
          6. Send FINALIZE_COMMIT_MSG
              1. [bug][improvement] No ACK is done, so if this message is dropped then some participates will think the IR is still running (which can fail preview repairs)
      1. [Preview Repair - kind=REPAIRED] register with LocalSessions for IR state changes
      2. RepairSession.start
          1. [Not Preview Repair] Registering the session into SystemDistributedKeyspace's table repair_history
          2. If endpoints is empty
              1. [UNHANDLED - downstream logic does not handle this case] Set future with empty state (which is later seen as Failed... but not a failed future)
              2. [Not Preview Repair] Mark session failed in repair_history
          3. Check all endpoints, if any is down and hasSkippedReplicas=false, Fail the session
          4. For each table
              1. Create a RepairJob
              2. Execute job in RepairTask's executor
              3. await all jobs
                  1. If all success
                      1. Set session result to include the job results
                  2. If any fail
                      1. Fail the session future
                      2. [Question] why does this NOT update repair_history like other failures?
      1. [parallelism in [SEQUENTIAL, DATACENTER_AWARE] and not IR] send SNAPSHOT_MSG to all participates
          1. [improvement] SNAPSHOT_MSG should be idempotent so coordinator can retry if no ACK.  This calls org.apache.cassandra.repair.TableRepairManager#snapshot with the RepairRunnable ID; this makes sure to snapshot once unless force=true (RepairOptions !(dataCenters.isEmpty and hosts.isEmpty()))
          2. [improvement] This task is short lived, so rather than running in a cached pool (which may allocate a thread), inline or add a notation of cooperative sharing if retries are implemented
      2. Await snapshot success
      3. Send VALIDATION_REQ to all participates (all at once, or batched based off parallelism)
          1. [bug] MerkleTree may be large, even if off-heap; this can cause the coordinator to OOM (heap or direct); there is no bounds to the number of MerkleTree which may be in-flight
          2. [improvement] VALIDATION_REQ could be made idempotent.  Right now we create a Validator and submit to ValidationManager, but could dedupe based off session id
      4. Await validation success
      5. Stream any-all conflicting ranges (2 modes: optimiseStreams && not pullRepair = optimisedSyncing, else standardSyncing)
          1. Create a SyncTask (Local, Asymmetric, Symmetric) for each conflicting range
              1. Local: create stream plan and use streaming
              2. Asymmetric/Symmetric: send SYNC_REQ
                  1. [bug][improvement] No ACK is done, so if this message is dropped streaming does not start on the participate
                  2. [improvement] Both AsymmetricRemoteSyncTask, and SymmetricRemoteSyncTask are the same class; they are copy/paste clones of each other; the only difference is AsymmetricRemoteSyncTask creates the SyncRequest with asymmetric=true
                  3. [improvement] Can be idempotent (when remote); currently just starts streaming right away, would need to dedup on the session
      6. Await streaming complete
      7. onSuccess
          1. [not preview repair] update repair_history, marking the session success
          2. Set the future as success
      8. onFailure
          1. Abort validation tasks
          2. [not preview repair] update repair_history, marking the session failed
          3. Set the future to a failure
      Repair Participate State
      1. Receive PREPARE_MSG
          1. [improvement] PREPARE_MSG should be idempotent
              1. [current state] If parentRepairSessions contains the session, it ignores the request and noops; but does NOT validate that the sessions match
              2. [current state] mostly idempotent, assuming CompactionsPendingThreshold does not trigger
          1. Create LocalSession in-memory
          2. Persist LocalSession to system.repairs
          3. Create new Executor; 1 thread per table
              1. [improvement] the thread waits for org.apache.cassandra.db.repair.PendingAntiCompaction.AcquisitionCallable#acquireTuple to get called and rerun SSTables from Tracker
              2. [improvement] causes the active compactions (not Validations) to get interrupted, so if its running for a long time and generated a ton of garbage... it was done in waste
              3. [improvement] org.apache.cassandra.db.ColumnFamilyStore#runWithCompactionsDisabled is on a single table level but could be extended for multiple tables, this would allow only blocking on 1 thread as acquireTuple is just mutating org.apache.cassandra.db.lifecycle.Tracker
          4. Run PendingAntiCompaction
              1. Block/Interrupt compactions, to collect SSTables to AntiCompact
              2. Trigger org.apache.cassandra.db.compaction.CompactionManager#submitPendingAntiCompaction for each table
              3. Calls org.apache.cassandra.db.compaction.CompactionManager#performAnticompaction
                  1. Sets UNREPAIRED_SSTABLE for each SSTable
                  2. Group SStables
                  3. For each group, rewrite SSTable into 3 SSTables: full (repaired), transient (repaired), other (unprepared; out of range)
              4. On Success
                  1. Update system.repairs
                  2. Notify listeners (aka RepairSession)
                  3. Send PREPARE_CONSISTENT_RSP with body=true
              5. On Failure
                  1. Send PREPARE_CONSISTENT_RSP with body=false
                  2. Update system.repairs
      3. [parallelism in [SEQUENTIAL, DATACENTER_AWARE] and not IR] Receive SNAPSHOT_MSG
          1. Create a table snapshot using the RepairRunnable ID.  If !RepairOption.isGlobal, then override the snapshot if present
      4. Receive VALIDATION_REQ
          1. Creates a Validator and submits to CompactionManager's validationExecutor
          2. Core logic: org.apache.cassandra.repair.ValidationManager#doValidation
          3. Iterate over each partition/row, updating a MerkleTree
          4. When done, switch to the ANTI_ENTROPY stage
          5. If coordinator is remote
              1. Send a VALIDATION_RSP back with the MerkleTree (or null if failed)
          6. Else
              1. Switch to ANTI_ENTROPY again
              2. Attempt to move MerkleTree off-heap
              3. Forward message to ActiveRepairService.handleMessage
      5. [SyncTask in [AsymmetricRemoteSyncTask, SymmetricRemoteSyncTask]] Receive SYNC_REQ
          1. Creates a stream plan and use streaming (StreamingRepairTask)
      6. [IR] Receive FINALIZE_PROPOSE_MSG
          1. Update system.repairs
          2. Force flush table
      7. [IR] Receive FINALIZE_COMMIT_MSG
          1. Update system.repairs
      8. Receive CLEANUP_MSG
          1. Removes in-memory state and disk snapshots
      LocalSessions (IR)
      * On start, load state from system.repairs; mark active repairs as failed and sent FAILED_SESSION_MSG to coordinato
      * [every 10m; controlled by -Dcassandra.repair_cleanup_interval_seconds] for each LocalSession
          * If not updated within 1D (controlled by -Dcassandra.repair_fail_timeout_seconds), mark repair failed
          * Else if not updated within 1D (controlled by -Dcassandra.repair_delete_timeout_seconds)
              * Delete in-memory state
              * Delete system.repairs
          * Else if not updated within 1H (controlled by -Dcassandra.repair_status_check_timeout_seconds), request status update from all participants (get ConsistentSession.State)
              * Update system.repairs if any participants have state FINALIZED or FAILED
              * [bug][improvement] if Validation or Streaming is still running, this just updates the state but does not terminate active tasks
              * [improvement] if all participates are still running, we will check again in 10m
      * Streaming has docs on how it works, so this section just shows some of the special casing
      * [Preview Repair] org.apache.cassandra.db.streaming.CassandraStreamManager#createOutgoingStreams filters matches based off org.apache.cassandra.streaming.PreviewKind#predicate
      * [Preview Repair] skip streaming the files, only stream the metadata


        Issue Links



              dcapwell David Capwell
              dcapwell David Capwell
              David Capwell
              0 Vote for this issue
              7 Start watching this issue