Uploaded image for project: 'Apache Cassandra'
  1. Apache Cassandra
  2. CASSANDRA-16909

☂ Medium Term Repair Improvements

Agile BoardAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsAdd voteVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    • Epic
    • Status: In Progress
    • Normal
    • Resolution: Unresolved
    • None
    • Consistency/Repair
    • None
    • ☂ Medium Term Repair Improvements
    • Operability
    • Normal
    • All
    • None

    Description

      This is to track the repair improvement works defined on the dev list.

      Email was: [DISCUSS] Repair Improvement Proposal

      This JIRA will track the related tasks and be used for documentaiton

      Repair Coordinator State
      1. ActiveRepairService.recordRepairStatus
          1. Maps JMX command (int) to (status, msg)
      2. Get/validate ColumnFamilies
          1. Skip if no ColumnFamilies to repair
      3. Get replicas for ranges
          1. Ignore ranges not covered by this instance IFF --ignore-unreplicated-keyspaces, else Fail
          2. Skip if no ranges to repair
          3. If --force filter out non-alive (FailureDetector.isAlive) participants
      4. [Not PREVIEW]Update SystemDistributedKeyspace's parent_repair_history
      5. ActiveRepairService.prepareForRepair
          1. TODO Should this be under PREPARE or still part of validation?
              1. If CompactionsPendingThreshold triggered, Fail
              2. registerParentRepairSession - update map of UUID -> ParentRepairSession
          2. Send PREPARE_MSG awaiting responses [possible failures: timeout waiting, participate failure]
              1. [improvement] PREPARE_MSG should be idempotent and if no reply within X, retry Y times
              2. Known Failures; all are retryable at this stage
                  1. Timeout
                  2. Participate goes from alive to down
                  3. CompactionsPendingThreshold triggered
                      1. Not included in org.apache.cassandra.exceptions.RequestFailureReason#forException, so looks the same as Unexpected error
                          1. If updated and in mixed-mode, the update falls back to UNKNOWN, which then matches the unexpected error behavior
                  4. Unexpected error (this removes the session)
      6. Run RepairTask (normal, IR, preview); see coordinator state for each type
      7. On Success
          1. Update SystemDistributedKeyspace.parent_repair_history to show the successful ranges
          2. If any sub-session failed, fail the job
          3. ActiveRepairService.cleanUp - send CLEANUP_MSG to all participates to clean up
              1. TODO: why is this only on success and not failures as well?
              2. [improvement] - does not wait for ACK (though it is sent), we log it async if we get it (else timeout)
      8. On Exception
          1. fail
      
      Normal/Preview Repair Coordinator State
      1. For each common range
          1. ActiveRepairService.submitRepairSession
              1. Creates/run a RepairSession for each CommonRange
          2. Once all RepairSessions done
              1. [not consistent cross each type] handle session errors
              2. [Preview Repair] check SyncStatSummary for difference and send a human readable notification 
      
      Incremental Repair Coordinator State
      1. org.apache.cassandra.repair.consistent.CoordinatorSessions#registerSession to create CoordinatorSession
      2. CoordinatorSession.execute
          1. Send PREPARE_CONSISTENT_REQ and wait for PREPARE_CONSISTENT_RSP
          2. Await all success
          3. Trigger Normal Repair (see "Normal/Preview Repair Coordinator State")
          4. Send FINALIZE_PROPOSE_MSG and wait for FINALIZE_PROMISE_MSG
          5. Await all success
          6. Send FINALIZE_COMMIT_MSG
              1. [bug][improvement] No ACK is done, so if this message is dropped then some participates will think the IR is still running (which can fail preview repairs)
      
      RepairSession
      1. [Preview Repair - kind=REPAIRED] register with LocalSessions for IR state changes
      2. RepairSession.start
          1. [Not Preview Repair] Registering the session into SystemDistributedKeyspace's table repair_history
          2. If endpoints is empty
              1. [UNHANDLED - downstream logic does not handle this case] Set future with empty state (which is later seen as Failed... but not a failed future)
              2. [Not Preview Repair] Mark session failed in repair_history
          3. Check all endpoints, if any is down and hasSkippedReplicas=false, Fail the session
          4. For each table
              1. Create a RepairJob
              2. Execute job in RepairTask's executor
              3. await all jobs
                  1. If all success
                      1. Set session result to include the job results
                  2. If any fail
                      1. Fail the session future
                      2. [Question] why does this NOT update repair_history like other failures?
      
      RepairJob
      1. [parallelism in [SEQUENTIAL, DATACENTER_AWARE] and not IR] send SNAPSHOT_MSG to all participates
          1. [improvement] SNAPSHOT_MSG should be idempotent so coordinator can retry if no ACK.  This calls org.apache.cassandra.repair.TableRepairManager#snapshot with the RepairRunnable ID; this makes sure to snapshot once unless force=true (RepairOptions !(dataCenters.isEmpty and hosts.isEmpty()))
          2. [improvement] This task is short lived, so rather than running in a cached pool (which may allocate a thread), inline or add a notation of cooperative sharing if retries are implemented
      2. Await snapshot success
      3. Send VALIDATION_REQ to all participates (all at once, or batched based off parallelism)
          1. [bug] MerkleTree may be large, even if off-heap; this can cause the coordinator to OOM (heap or direct); there is no bounds to the number of MerkleTree which may be in-flight
          2. [improvement] VALIDATION_REQ could be made idempotent.  Right now we create a Validator and submit to ValidationManager, but could dedupe based off session id
      4. Await validation success
      5. Stream any-all conflicting ranges (2 modes: optimiseStreams && not pullRepair = optimisedSyncing, else standardSyncing)
          1. Create a SyncTask (Local, Asymmetric, Symmetric) for each conflicting range
              1. Local: create stream plan and use streaming
              2. Asymmetric/Symmetric: send SYNC_REQ
                  1. [bug][improvement] No ACK is done, so if this message is dropped streaming does not start on the participate
                  2. [improvement] Both AsymmetricRemoteSyncTask, and SymmetricRemoteSyncTask are the same class; they are copy/paste clones of each other; the only difference is AsymmetricRemoteSyncTask creates the SyncRequest with asymmetric=true
                  3. [improvement] Can be idempotent (when remote); currently just starts streaming right away, would need to dedup on the session
      6. Await streaming complete
      7. onSuccess
          1. [not preview repair] update repair_history, marking the session success
          2. Set the future as success
      8. onFailure
          1. Abort validation tasks
          2. [not preview repair] update repair_history, marking the session failed
          3. Set the future to a failure
      
      
      Repair Participate State
      1. Receive PREPARE_MSG
          1. [improvement] PREPARE_MSG should be idempotent
              1. [current state] If parentRepairSessions contains the session, it ignores the request and noops; but does NOT validate that the sessions match
              2. [current state] mostly idempotent, assuming CompactionsPendingThreshold does not trigger
      2. [IR] Receive PREPARE_CONSISTENT_REQ
          1. Create LocalSession in-memory
          2. Persist LocalSession to system.repairs
          3. Create new Executor; 1 thread per table
              1. [improvement] the thread waits for org.apache.cassandra.db.repair.PendingAntiCompaction.AcquisitionCallable#acquireTuple to get called and rerun SSTables from Tracker
              2. [improvement] causes the active compactions (not Validations) to get interrupted, so if its running for a long time and generated a ton of garbage... it was done in waste
              3. [improvement] org.apache.cassandra.db.ColumnFamilyStore#runWithCompactionsDisabled is on a single table level but could be extended for multiple tables, this would allow only blocking on 1 thread as acquireTuple is just mutating org.apache.cassandra.db.lifecycle.Tracker
          4. Run PendingAntiCompaction
              1. Block/Interrupt compactions, to collect SSTables to AntiCompact
              2. Trigger org.apache.cassandra.db.compaction.CompactionManager#submitPendingAntiCompaction for each table
              3. Calls org.apache.cassandra.db.compaction.CompactionManager#performAnticompaction
                  1. Sets UNREPAIRED_SSTABLE for each SSTable
                  2. Group SStables
                  3. For each group, rewrite SSTable into 3 SSTables: full (repaired), transient (repaired), other (unprepared; out of range)
              4. On Success
                  1. Update system.repairs
                  2. Notify listeners (aka RepairSession)
                  3. Send PREPARE_CONSISTENT_RSP with body=true
              5. On Failure
                  1. Send PREPARE_CONSISTENT_RSP with body=false
                  2. Update system.repairs
      3. [parallelism in [SEQUENTIAL, DATACENTER_AWARE] and not IR] Receive SNAPSHOT_MSG
          1. Create a table snapshot using the RepairRunnable ID.  If !RepairOption.isGlobal, then override the snapshot if present
      4. Receive VALIDATION_REQ
          1. Creates a Validator and submits to CompactionManager's validationExecutor
          2. Core logic: org.apache.cassandra.repair.ValidationManager#doValidation
          3. Iterate over each partition/row, updating a MerkleTree
          4. When done, switch to the ANTI_ENTROPY stage
          5. If coordinator is remote
              1. Send a VALIDATION_RSP back with the MerkleTree (or null if failed)
          6. Else
              1. Switch to ANTI_ENTROPY again
              2. Attempt to move MerkleTree off-heap
              3. Forward message to ActiveRepairService.handleMessage
      5. [SyncTask in [AsymmetricRemoteSyncTask, SymmetricRemoteSyncTask]] Receive SYNC_REQ
          1. Creates a stream plan and use streaming (StreamingRepairTask)
      6. [IR] Receive FINALIZE_PROPOSE_MSG
          1. Update system.repairs
          2. Force flush table
      7. [IR] Receive FINALIZE_COMMIT_MSG
          1. Update system.repairs
      8. Receive CLEANUP_MSG
          1. Removes in-memory state and disk snapshots
      
      LocalSessions (IR)
      * On start, load state from system.repairs; mark active repairs as failed and sent FAILED_SESSION_MSG to coordinato
      * [every 10m; controlled by -Dcassandra.repair_cleanup_interval_seconds] for each LocalSession
          * If not updated within 1D (controlled by -Dcassandra.repair_fail_timeout_seconds), mark repair failed
          * Else if not updated within 1D (controlled by -Dcassandra.repair_delete_timeout_seconds)
              * Delete in-memory state
              * Delete system.repairs
          * Else if not updated within 1H (controlled by -Dcassandra.repair_status_check_timeout_seconds), request status update from all participants (get ConsistentSession.State)
              * Update system.repairs if any participants have state FINALIZED or FAILED
              * [bug][improvement] if Validation or Streaming is still running, this just updates the state but does not terminate active tasks
              * [improvement] if all participates are still running, we will check again in 10m
      
      Streaming
      * Streaming has docs on how it works, so this section just shows some of the special casing
      * [Preview Repair] org.apache.cassandra.db.streaming.CassandraStreamManager#createOutgoingStreams filters matches based off org.apache.cassandra.streaming.PreviewKind#predicate
      * [Preview Repair] skip streaming the files, only stream the metadata
      

      Attachments

        Issue Links

        There are no issues.

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            dcapwell David Capwell Assign to me
            dcapwell David Capwell
            David Capwell

            Dates

              Created:
              Updated:

              Slack

                Issue deployment