NOTE: This bug has not been actually observed. It is based on reviews of the current implementation.
I would expect it to be a pretty rare case, bu at scale, even the rare cases happen often enough.
Intermediate RPC messages from JM to TM can get dropped, even when the TM is not marked as failed.
That can happen when the connection can be recovered before the heartbeat times out.
So RPCs generally retry, or handle failures: For example Deploy-Task-RPC retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers a new checkpoint.
The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a Future with the acknowledgement. But if that one fails, we are in the situation where we do not know whether the event sending was successful or not (only the ack failed).
This is especially tricky for split assignments and checkpoints. Consider this sequence of actions:
1. Coordinator assigns a split. Ack not yet received.
2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so is not included on the Coordinator.
3. Split assignment RPC response is "failed".
4. Checkpoint completes.
Now we don't know whether the split was in the checkpoint on the Operator (TaskManager) or not, and with that we don't know whether we should add it back to the coordinator. We need to do something to make sure the split is now either on the coordinator or on the Operator. Currently, the split is implicitly assumed to be on the Operator; if it isn't, then that split is lost.
Not, it is worth pointing out that this is a pretty rare situation, because it means that the RPC with the split assignment fails and the one for the checkpoint succeeds, even though they are in close proximity. The way the Akka-based RPC transport works (with retries, etc.), this can happen, but isn't very likely. That why we haven't so far seen this bug in practice or haven't gotten a report for it, yet.
The solution has two components:
1. Fallback to consistent point: If the system doesn't know whether two parts are still consistent with each other (here coordinator and Operator), fall back to a consistent point. Here that is the case when the Ack-Future for the "Send Operator Event" RPC fails or times out. Then we call the scheduler to trigger a failover of the target operator to latest checkpoint and signaling the coordinator the same. That restores consistency. We can later optimize this (see below).
2. We cannot trigger checkpoints while we are "in limbo" concerning our knowledge about splits. Concretely that means that the Coordinator can only acknowledge the checkpoint once the Acks for pending Operator Event RPCs (Assign-Splits) have arrived. The checkpoint future is conditional on all pending RPC futures. If the RPC futures fail (or time out) then the checkpoint cannot complete (and the target operator will anyways go through a failover). In the common case, RPC round trip time is milliseconds, which would be added to the checkpoint latency if the checkpoint happends to overlap with a split assignment (most won't).
Step (1) above can be optimized by going with retries first and sequence numbers to deduplicate the calls. That can help reduce the number of cases were a failover is needed. However, the number of situations where the RPC would need a retry and has a chance of succeeding (the TM is not down) should be very few to begin with, so whether this optimization is worth it remains to be seen.