Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-18983

Job doesn't changed to failed if close function has blocked

Attach filesAttach ScreenshotAdd voteVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      If a operator throw a exception, it will break process loop and dispose all operator. But state will never switch to FAILED if block in Function.close, and JobMaster can't know the final state and do restart.

      Task have TaskCancelerWatchDog to kill process if cancellation timeout, but it doesn't work for FAILED task.TAskThread will allways hang at:
      org.apache.flink.streaming.runtime.tasks.StreamTask#cleanUpInvoke

      Test case:

      Configuration configuration = new Configuration();
      configuration.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10000L);
      StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, configuration);
      env.addSource(...)
      	.process(new ProcessFunction<String, String>() {
      		@Override
      		public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
      			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
      				throw new RuntimeException();
      			}
      		}
      
      		@Override
      		public void close() throws Exception {
      			if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
      				Thread.sleep(10000000);
      			}
      		}
      	}).setParallelism(2)
      	.print();
      

      In this case, job will block at close action and never change to FAILED.
      If change thread which subtaskIndex == 1 to sleep, TM will exit after TASK_CANCELLATION_TIMEOUT.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            Unassigned Unassigned
            liuyufei YufeiLiu

            Dates

              Created:
              Updated:

              Slack

                Issue deployment