Details
-
Bug
-
Status: Closed
-
Blocker
-
Resolution: Fixed
-
1.12.2
Description
NOTE: This bug has not been actually observed. It is based on reviews of the current implementation.
I would expect it to be a pretty rare case, bu at scale, even the rare cases happen often enough.
Problem
Intermediate RPC messages from JM to TM can get dropped, even when the TM is not marked as failed.
That can happen when the connection can be recovered before the heartbeat times out.
So RPCs generally retry, or handle failures: For example Deploy-Task-RPC retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers a new checkpoint.
The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a Future with the acknowledgement. But if that one fails, we are in the situation where we do not know whether the event sending was successful or not (only the ack failed).
This is especially tricky for split assignments and checkpoints. Consider this sequence of actions:
1. Coordinator assigns a split. Ack not yet received.
2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so is not included on the Coordinator.
3. Split assignment RPC response is "failed".
4. Checkpoint completes.
Now we don't know whether the split was in the checkpoint on the Operator (TaskManager) or not, and with that we don't know whether we should add it back to the coordinator. We need to do something to make sure the split is now either on the coordinator or on the Operator. Currently, the split is implicitly assumed to be on the Operator; if it isn't, then that split is lost.
Not, it is worth pointing out that this is a pretty rare situation, because it means that the RPC with the split assignment fails and the one for the checkpoint succeeds, even though they are in close proximity. The way the Akka-based RPC transport works (with retries, etc.), this can happen, but isn't very likely. That why we haven't so far seen this bug in practice or haven't gotten a report for it, yet.
Proposed solution
The solution has two components:
1. Fallback to consistent point: If the system doesn't know whether two parts are still consistent with each other (here coordinator and Operator), fall back to a consistent point. Here that is the case when the Ack-Future for the "Send Operator Event" RPC fails or times out. Then we call the scheduler to trigger a failover of the target operator to latest checkpoint and signaling the coordinator the same. That restores consistency. We can later optimize this (see below).
2. We cannot trigger checkpoints while we are "in limbo" concerning our knowledge about splits. Concretely that means that the Coordinator can only acknowledge the checkpoint once the Acks for pending Operator Event RPCs (Assign-Splits) have arrived. The checkpoint future is conditional on all pending RPC futures. If the RPC futures fail (or time out) then the checkpoint cannot complete (and the target operator will anyways go through a failover). In the common case, RPC round trip time is milliseconds, which would be added to the checkpoint latency if the checkpoint happends to overlap with a split assignment (most won't).
Possible Future Improvements
Step (1) above can be optimized by going with retries first and sequence numbers to deduplicate the calls. That can help reduce the number of cases were a failover is needed. However, the number of situations where the RPC would need a retry and has a chance of succeeding (the TM is not down) should be very few to begin with, so whether this optimization is worth it remains to be seen.
Attachments
Issue Links
- causes
-
FLINK-23233 OperatorEventSendingCheckpointITCase.testOperatorEventLostWithReaderFailure fails on azure
- Closed
-
FLINK-22417 OrcFileSystemITCase.testOrcFilterPushDown fails on AZP
- Closed
-
FLINK-22420 UnalignedCheckpointITCase failed
- Closed
- duplicates
-
FLINK-20254 HiveTableSourceITCase.testStreamPartitionReadByCreateTime times out
- Closed
-
FLINK-22100 BatchFileSystemITCaseBase.testPartialDynamicPartition fail because of no output for 900 seconds
- Closed
-
FLINK-22129 OrcFileSystemITCase hangs on azure
- Closed
- is blocked by
-
FLINK-18071 CoordinatorEventsExactlyOnceITCase.checkListContainsSequence fails on CI
- Closed
- is related to
-
FLINK-22129 OrcFileSystemITCase hangs on azure
- Closed
- relates to
-
FLINK-32751 DistinctAggregateITCaseBase.testMultiDistinctAggOnDifferentColumn got stuck on AZP
- Resolved
-
FLINK-20254 HiveTableSourceITCase.testStreamPartitionReadByCreateTime times out
- Closed
-
FLINK-22129 OrcFileSystemITCase hangs on azure
- Closed
-
FLINK-22345 CoordinatorEventsExactlyOnceITCase hangs on azure
- Closed
-
FLINK-22397 OrcFileSystemITCase fails on Azure
- Closed
-
FLINK-22415 JsonBatchFileSystemITCase fails on Azure
- Closed
-
FLINK-22420 UnalignedCheckpointITCase failed
- Closed
- links to