Affects Version/s: None
Fix Version/s: None
In HBase cross site replication, on the source side, every regionserver places its WALs into a replication queue and then drains the queue to the remote sink cluster. At the source cluster every regionserver participates as a source. At the sink cluster, a configurable subset of regionservers volunteer to process inbound replication RPC.
When data is highly skewed we can take certain steps to mitigate, such as pre-splitting, or manual splitting, and rebalancing. This can most effectively be done at the sink, because replication RPCs are randomly distributed over the set of receiving regionservers, and splitting on the sink side can effectively redistribute resulting writes there. On the source side we are more limited.
If writes are deeply unbalanced, a regionserver's source replication queue may become very deep. Hotspotting can happen, despite mitigations. Unlike on the sink side, once hotspotting has happened at the source, it is not possible to increase parallelism or redistribute work among sources once WALs have already been enqueued. Increasing parallelism on the sink side will not help if there is a big rock at the source. Source side mitigations like splitting and region redistribution cannot help deep queues already accumulated.
Can we redistribute source work? Yes and no. If a source regionserver fails, its queues will be recovered by other regionservers. However the other rs must still serve the recovered queue as an atomic entity. We can move a deep queue, but we can't break it up.
Where time is of the essence, and ordering semantics can be allowed to break, operators should have available to them a recovery tool that rescues their production from the consequences of deep source queues. A very large replication queue can be split into many smaller queues. Perhaps even one new queue for each WAL file. Then, these new synthetic queues can be distributed to any/all source regionservers through the normal recovery queue assignment protocol. This increases parallelism at the source.
Of course this would break serial replication semantics and even in branch-1 which does not have that feature it would signficantly increase the probability of reordering of edits. That is an unavoidable consequence of breaking up the queue for more parallelism. As long as this is done by a separate tool, invoked by operators, it is a valid option for emergency drain, and once the drain is complete, the final state will be properly ordered. Every cell in the WAL entries carries a timestamp assigned at the source, and will be applied on the sink with this timestamp. When the queue is drained and all edits have been persisted at the target, there will be a complete and correct temporal data ordering at that time. An operator will be and must be prepared to handle intermediate mis-/re-ordered states if they intend to invoke this tool. In many use cases the interim states are not important. The final state after all edits have transferred cross cluster and persisted at this sink, after invocation of the recovery tool, is the point where the operator would transition back into service.
As a strawman we can propose these work items:
- Add a replication admin command that can reassign a replication queue away from an active source. The active source makes a new queue and continues. The previously active queue can be assigned to another regionserver as a recovery queue or can be left unassigned (e.g. target = null)
- Administratively unassigned recovery queues should not be automatically processed, but must be discoverable.
- Add a replication admin command that transitions an unassigned replication queue into an active and eligible recovery queue.
- Create a tool that uses these new APIs to take control of a (presumably deep) replication queue, breaks up the queue into its constituent WAL files, creates new synthetic queues according to a configurable and parameterized grouping function, and uses the new APIs to make the new synthetic queues eligible for recovery. The original queue retains one group as defined by the grouping policy and itself is made re-eligible for recovery.