Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29816

Fix the bug that StreamTask doesn't handle exception during restoring

    XMLWordPrintableJSON

Details

    Description

      1. How to repeat 

      ProcessWindowFunction, and make some exception in process()
      test code

      public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setParallelism(1);
              env.enableCheckpointing(60 * 1000);
              env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
              env.getCheckpointConfig().setCheckpointTimeout(60000);
      
              KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder()
                      .setBootstrapServers("****")
                      .setTopics("****")
                      .setGroupId("****")
                      .setValueOnlyDeserializer(new SimpleStringSchema())
                      .setStartingOffsets(OffsetsInitializer.earliest())
                      .build();
      
              DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source");
      
              SingleOutputStreamOperator<String> mapSourse = kafkaSource.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
                      .process(new ProcessWindowFunction<String, String, String, TimeWindow>() {
                          @Override
                          public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> iterable, Collector<String> collector) throws Exception {
                              //when process event:"abc" .It causes java.lang.NumberFormatException
                              Integer intS = Integer.valueOf(s);
                              collector.collect(s);
                          }
                      })
                      .name("name-process").uid("uid-process");
      
              mapSourse.print();
              env.execute();
          }
      

      kafka input event

      >1
      >1
      >2
      >2
      >3
      >3
      >abc
      >abc
      >
      

      2. fault phenomena

      when job process the event:"abc",It will cause java.lang.NumberFormatException and failover ,Then attempt and failover continuously.
      However, it only failover 2 times(attempt 0, attempt 1) and when attempt for third time, It work normally, and no exception

      checkpoint 1  complete in attempt 1,before failover exception 1

      2022-10-31 16:59:53,644 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=CHECKPOINT) @ 1667206793605 for job 7bca78a75b089d447bb4c99efcfd6527.2022-10-31 16:59:54,010 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 1 for job 7bca78a75b089d447bb4c99efcfd6527 (21630 bytes, checkpointDuration=333 ms, finalizationTime=72 ms).  

       

      attempt 2 was restore from checkpoint

      2022-10-31 17:00:30,033 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 7bca78a75b089d447bb4c99efcfd6527 located at hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1.
      

       

       

      3. possible reasons

      during attempt 2 , task restore from checkpoint, userfunction in ProcessWindowFunction was called in SteamTask.restore and produce "java.lang.NumberFormatException", However, SteamTask catch exception and didn't handle exception because subtask is not in RUNNING state.

      the stack trace in attempt 2
      user function was called in SteamTask.restore(subtask state is INITIALIZING)

      java.lang.Thread.getStackTrace(Thread.java:1552)
      com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
      com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
      org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
      org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
      org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
      org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
      org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
      org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
      org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
      org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
      org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
      org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
      org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
      org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690)
      org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
      org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
      org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
      org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
      java.lang.Thread.run(Thread.java:745)
      

      stack trace(which cause failover) in attempt 0 and attempt 1
      user function was called in SteamTask.invoke

      com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45)
      com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40)
      org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57)
      org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
      org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568)
      org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524)
      org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693)
      org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684)
      org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
      org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
      org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
      org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
      org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
      org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
      org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
      org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
      org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
      org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
      org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
      java.lang.Thread.run(Thread.java:745)
      

      in org.apache.flink.streaming.runtime.tasks.StreamTask handleAsyncException
      SteamTask only handleAsyncException when is Running==true
      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1540

          @Override
          public void handleAsyncException(String message, Throwable exception) {
              if (isRunning) {
                  // only fail if the task is still running
                  asyncExceptionHandler.handleAsyncException(message, exception);
              }
          }
      

      but during restore,isRunning==false
      https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L673

       

      So during Steam.restore, SteamTask skip exception in userfunction of ProcessWindowFunction.

       

       

       

       

      Attachments

        1. image-2022-10-31-19-49-52-432.png
          81 kB
          Xie Yi
        2. image-2022-10-31-19-54-12-546.png
          217 kB
          Xie Yi
        3. image-2022-11-02-10-42-21-099.png
          638 kB
          Xie Yi
        4. image-2022-11-02-10-57-08-064.png
          642 kB
          Xie Yi
        5. image-2022-11-02-11-06-37-925.png
          699 kB
          Xie Yi
        6. image-2022-11-02-11-10-25-508.png
          781 kB
          Xie Yi
        7. image-2023-02-22-17-26-06-200.png
          379 kB
          RocMarshal

        Issue Links

          Activity

            People

              RocMarshal RocMarshal
              xieyi Xie Yi
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: