The ExecutionGraph structure would be recovered from TaskManager reports during reconciling period, and the necessary information includes:
- Execution: ExecutionAttemptID, AttemptNumber, StartTimestamp, ExecutionState, SimpleSlot, PartialInputChannelDeploymentDescriptor(Consumer Execution)
- ExecutionVertex: Map<IntermediateResultPartitionID, IntermediateResultPartition>
- ExecutionGraph: ConcurrentHashMap<ExecutionAttemptID, Execution>
For RECONCILING ExecutionState, it should be transition into any existing task states (RUNNING,CANCELED,FAILED,FINISHED). To do so, the TaskManger should maintain the terminal task state (CANCELED,FAILED,FINISHED) for a while and we try to realize this mechanism in another jira. In addition, the state transition would trigger different actions, and some actions rely on above necessary information. Considering this limit, the recovery process will be divided into two steps:
- First, recovery all other necessary information except ExecutionState.
- Second, transition ExecutionState into real task state and trigger actions. The behavior is the same with current UpdateTaskExecutorState.
To make logic easy and consistency, during recovery period, all the other RPC messages (UpdateTaskExecutionState, ScheduleOrUpdateConsumers,etc) from TaskManager should be refused temporarily and responded with a special message by JobMaster. Then the TaskManager should retry to send these messages later until JobManager ends recovery and acknowledgement.
For RECONCILING JobStatus, it would be transition into one of the states (RUNNING,FAILING,FINISHED) after recovery.
- RECONCILING to RUNNING: All the TaskManager report within duration time and all the tasks are in RUNNING states.
- RECONCILING to FAILING: One of the TaskManager does not report in time, or one of the tasks state is in FAILED or CANCELED
- RECONCILING to FINISHED: All the TaskManger report within duration time and all the tasks are in FINISHED states.