Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30511

Ignore the Exception in user-timer Triggerble when recover form state.

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Duplicate
    • 1.16.0
    • None
    • API / DataStream
    • None
    • Flink 1.16.0

      java8

      deployment Mode: miniCluster in IDC; standalone, yarn-application.

    Description

      • Code segment:
      public class OnTimerDemo {
      
          public static void main(String[] args) throws Exception {
              Configuration conf = new Configuration();
              conf.setString("taskmanager.numberOfTaskSlots", "4");
              conf.setString("state.checkpoint-storage", "filesystem");
              conf.setString("state.checkpoints.dir", "file:///tmp/flinkjob");
              conf.setString("execution.checkpointing.interval", "30s");
      
              //conf.setString("execution.savepoint.path", "file:///tmp/flinkjob/159561b8c97c9e0b4f9eeb649086796a/chk-1"); // Anchor-A:
      
              StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
      
              env.setParallelism(1);
      
              EnvironmentSettings envSetting = EnvironmentSettings
                      .newInstance()
                      .inStreamingMode()
                      .build();
      
              StreamTableEnvironment tableEnv =  StreamTableEnvironment.create(env, envSetting);
      
              String sourceDDL = "CREATE TABLE orders (\n" +
                      "  id           INT,\n" +
                      "  app          INT,\n" +
                      "  user_id      STRING" +
                      ") WITH (\n" +
                      "   'connector' = 'datagen',\n" +
                      "   'rows-per-second'='1',\n" +
                      "   'fields.app.min'='1',\n" +
                      "   'fields.app.max'='10',\n" +
                      "   'fields.user_id.length'='10'\n" +
                      ")";
      
              tableEnv.executeSql(sourceDDL);
      
              Table query = tableEnv.sqlQuery("select * from orders");
              DataStream<Row> rowDataStream = tableEnv.toAppendStream(query, Row.class);
      
              TypeInformation<?>[] returnTypes = new TypeInformation[4];
              returnTypes[0] = Types.INT;
      
              returnTypes[1] = Types.INT; // Anchor-B:
      
              returnTypes[2] = Types.INT;
              returnTypes[3] = Types.INT;
      
      
              rowDataStream.keyBy(new KeySelector<Row, String>() {
                          @Override
                          public String getKey(Row value) throws Exception {
                              return value.getFieldAs(2);
                          }
                      }).process(new KeyedProcessFunction<String, Row, Row>() {
      
                          private Row firstRow;
      
                          @Override
                          public void processElement(Row value, Context ctx, Collector<Row> out) throws Exception {
                              if (firstRow == null) {
                                  firstRow = value;
                              }
                              ctx.timerService().registerProcessingTimeTimer(System.currentTimeMillis() + 3000);
                          }
      
                          @Override
                          public void onTimer(long timestamp, OnTimerContext ctx, Collector<Row> out) throws Exception {
                              Row colRow = new Row(4);
                              colRow.setField(0, 0);
                              colRow.setField(1, 1);
                              colRow.setField(2, 2);
                              colRow.setField(3, 3);
      
                              out.collect(colRow); // Anchor-C
      
                          }
                      }).name("TargetTestUDF")
                      .returns(new RowTypeInfo(returnTypes))
                      .print();
      
              env.execute(OnTimerDemo.class.getSimpleName());
          }
      
      }
       
      • Recurrence steps
        • Run the job without state.
        • Collect the latest available checkpoint path as 'checkpoint-path-a'
        • Stop the job.
        • Fill the real value of 'checkpoint-path-a' into 'Anchor-A' line and un-comment the line.
        • Set 'returnTypes[1] = Types.INT;' -> 'returnTypes[1] = Types.LONG;' at the 'Anchor-B' line.
        • Then add break-point at 'StreamTask#handleAsyncException' method.
        • Run the job. The 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' exception caused at the 'Anchor-C' line will ignore at  'StreamTask#handleAsyncException' method.
        • So, The framework can't catch the same exception in the case.
      • Root cause:
        • When job started from state data, the Task#restoreAndInvoke would be called. The exception 'java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long' was ignored at the above 'handleAsyncException' method instead of catching at catch-block of 'Task#restoreAndInvoke'.

                           

      Could it be seen as the framework's missing handling of exceptions? 
      If so, I prefer to re-throw the exception at 'StreamTask#handleAsyncException', which is suitable for the intention of the 'Task#restoreAndInvoke'.

      Thank u.
       
       
       

       

      Attachments

        1. 截屏2022-12-27 18.51.12.png
          193 kB
          RocMarshal
        2. 截屏2022-12-27 19.20.00.png
          521 kB
          RocMarshal

        Issue Links

          Activity

            People

              Unassigned Unassigned
              RocMarshal RocMarshal
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: