Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18983

Job doesn't changed to failed if close function has blocked

    XMLWordPrintableJSON

Details

    Description

      If a operator throw a exception, it will break process loop and dispose all operator. But state will never switch to FAILED if block in Function.close, and JobMaster can't know the final state and do restart.

      Task have TaskCancelerWatchDog to kill process if cancellation timeout, but it doesn't work for FAILED task.TAskThread will allways hang at:
      org.apache.flink.streaming.runtime.tasks.StreamTask#cleanUpInvoke

      Test case:

      Configuration configuration = new Configuration();
      configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10000L);
      StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
      env.addSource(...)
      	.process(new ProcessFunction<String, String>() {
      		@Override
      		public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
      			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
      				throw new RuntimeException();
      			}
      		}
      
      		@Override
      		public void close() throws Exception {
      			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
      				Thread.sleep(10000000);
      			}
      		}
      	}).setParallelism(2)
      	.print();
      

      In this case, job will block at close action and never change to FAILED.
      If change thread which subtaskIndex == 1 to sleep, TM will exit after TASK_CANCELLATION_TIMEOUT.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              liuyufei YufeiLiu
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated: