Details
-
Improvement
-
Status: Reopened
-
Not a Priority
-
Resolution: Unresolved
-
1.11.0, 1.12.0
-
None
Description
If a operator throw a exception, it will break process loop and dispose all operator. But state will never switch to FAILED if block in Function.close, and JobMaster can't know the final state and do restart.
Task have TaskCancelerWatchDog to kill process if cancellation timeout, but it doesn't work for FAILED task.TAskThread will allways hang at:
org.apache.flink.streaming.runtime.tasks.StreamTask#cleanUpInvoke
Test case:
Configuration configuration = new Configuration(); configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10000L); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, configuration); env.addSource(...) .process(new ProcessFunction<String, String>() { @Override public void processElement(String value, Context ctx, Collector<String> out) throws Exception { if (getRuntimeContext().getIndexOfThisSubtask() == 0) { throw new RuntimeException(); } } @Override public void close() throws Exception { if (getRuntimeContext().getIndexOfThisSubtask() == 0) { Thread.sleep(10000000); } } }).setParallelism(2) .print();
In this case, job will block at close action and never change to FAILED.
If change thread which subtaskIndex == 1 to sleep, TM will exit after TASK_CANCELLATION_TIMEOUT.
Attachments
Issue Links
- is duplicated by
-
FLINK-20649 StreamTask closeAllOperators stuck
- Closed
- is related to
-
FLINK-4714 Set task state to RUNNING after state has been restored
- Closed
-
FLINK-17012 Expose stage of task initialization
- Closed