Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22551

checkpoints: strange behaviour

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 1.13.0
    • Fix Version/s: None
    • Component/s: None
    • Environment:

      Description

      • Case 1: Work as expected
      public class Example {
      
          public static class ExampleSource extends RichSourceFunction<String>
                  implements CheckpointedFunction {
      
              private volatile boolean isRunning = true;
      
              @Override
              public void open(Configuration parameters) throws Exception {
                  System.out.println("[source] invoke open()");
              }
      
              @Override
              public void close() throws Exception {
                  isRunning = false;
                  System.out.println("[source] invoke close()");
              }
      
              @Override
              public void run(SourceContext<String> ctx) throws Exception {
                  System.out.println("[source] invoke run()");
                  while (isRunning) {
                      ctx.collect("Flink");
                      Thread.sleep(500);
                  }
              }
      
              @Override
              public void cancel() {
                  isRunning = false;
                  System.out.println("[source] invoke cancel()");
              }
      
              @Override
              public void snapshotState(FunctionSnapshotContext context) throws Exception {
                  System.out.println("[source] invoke snapshotState()");
              }
      
              @Override
              public void initializeState(FunctionInitializationContext context) throws Exception {
                  System.out.println("[source] invoke initializeState()");
              }
      
          }
      
          public static class ExampleSink extends PrintSinkFunction<String>
                  implements CheckpointedFunction {
      
              @Override
              public void snapshotState(FunctionSnapshotContext context) throws Exception {
                  System.out.println("[sink] invoke snapshotState()");
              }
      
              @Override
              public void initializeState(FunctionInitializationContext context) throws Exception {
                  System.out.println("[sink] invoke initializeState()");
              }
          }
      
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env =
                      StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
      
              DataStream<String> stream = env.addSource(new ExampleSource());
              stream.addSink(new ExampleSink()).setParallelism(1);
      
              env.execute();
          }
      }
      
      $ java -jar ./example.jar
      
      [sink] invoke initializeState()
      [source] invoke initializeState()
      [source] invoke open()
      [source] invoke run()
      Flink
      [sink] invoke snapshotState()
      [source] invoke snapshotState()
      Flink
      Flink
      [sink] invoke snapshotState()
      [source] invoke snapshotState()
      Flink
      Flink
      [sink] invoke snapshotState()
      [source] invoke snapshotState()
      ^C
      
      • Case 2: Run as unexpected (w/ parallelism = 1)
      public class Example {
      
          public static class ExampleSource extends RichSourceFunction<String>
                  implements CheckpointedFunction {
      
              private volatile boolean isRunning = true;
      
              @Override
              public void open(Configuration parameters) throws Exception {
                  System.out.println("[source] invoke open()");
              }
      
              @Override
              public void close() throws Exception {
                  isRunning = false;
                  System.out.println("[source] invoke close()");
              }
      
              @Override
              public void run(SourceContext<String> ctx) throws Exception {
                  System.out.println("[source] invoke run()");
                  while (isRunning) {
                      ctx.collect("Flink");
                      Thread.sleep(500);
                  }
              }
      
              @Override
              public void cancel() {
                  isRunning = false;
                  System.out.println("[source] invoke cancel()");
              }
      
              @Override
              public void snapshotState(FunctionSnapshotContext context) throws Exception {
                  System.out.println("[source] invoke snapshotState()");
              }
      
              @Override
              public void initializeState(FunctionInitializationContext context) throws Exception {
                  System.out.println("[source] invoke initializeState()");
              }
          }
      
      
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env =
                      StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
      
              DataStream<String> stream = env.addSource(new ExampleSource());
      
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", "localhost:9092");
              String topic = "my-topic";
      
              FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                      topic,
                      (element, timestamp) -> {
                          byte[] value = element.getBytes(StandardCharsets.UTF_8);
                          return new ProducerRecord<>(topic, null, timestamp, null, value, null);
                      },
                      properties,
                      FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
      
              stream.addSink(kafkaProducer).setParallelism(1);
      
              env.execute();
          }
      
      }
      
       $ java -jar ./example.jar
      
      [source] invoke cancel() 
      [source] invoke cancel() 
      [source] invoke cancel() 
      [source] invoke cancel() 
      ^C%
      

      Case 3: Run as unexpected (wo/ parallelism)

      public class Example {
      
          public static class ExampleSource extends RichSourceFunction<String>
                  implements CheckpointedFunction {
      
              private volatile boolean isRunning = true;
      
              @Override
              public void open(Configuration parameters) throws Exception {
                  System.out.println("[source] invoke open()");
              }
      
              @Override
              public void close() throws Exception {
                  isRunning = false;
                  System.out.println("[source] invoke close()");
              }
      
              @Override
              public void run(SourceContext<String> ctx) throws Exception {
                  System.out.println("[source] invoke run()");
                  while (isRunning) {
                      ctx.collect("Flink");
                      Thread.sleep(500);
                  }
              }
      
              @Override
              public void cancel() {
                  isRunning = false;
                  System.out.println("[source] invoke cancel()");
              }
      
              @Override
              public void snapshotState(FunctionSnapshotContext context) throws Exception {
                  System.out.println("[source] invoke snapshotState()");
              }
      
              @Override
              public void initializeState(FunctionInitializationContext context) throws Exception {
                  System.out.println("[source] invoke initializeState()");
              }
      
          }
      
          public static class ExampleSink extends PrintSinkFunction<String>
                  implements CheckpointedFunction {
      
              @Override
              public void snapshotState(FunctionSnapshotContext context) throws Exception {
                  System.out.println("[sink] invoke snapshotState()");
              }
      
              @Override
              public void initializeState(FunctionInitializationContext context) throws Exception {
                  System.out.println("[sink] invoke initializeState()");
              }
          }
      
          public static void main(String[] args) throws Exception {
              final StreamExecutionEnvironment env =
                      StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(1000);
      
              DataStream<String> stream = env.addSource(new ExampleSource());
      
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", "localhost:9092");
              String topic = "my-topic";
      
              FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                      topic,
                      (element, timestamp) -> {
                          byte[] value = element.getBytes(StandardCharsets.UTF_8);
                          return new ProducerRecord<>(topic, null, timestamp, null, value, null);
                      },
                      properties,
                      FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
      
              stream.addSink(kafkaProducer);
              env.execute();
          }
      
      }
      $ java -jar ./example.jar
      
      [source] invoke initializeState()
      [source] invoke open()
      [source] invoke run()
      [source] invoke cancel()
      [source] invoke close()
      [source] invoke initializeState()
      [source] invoke open()
      [source] invoke run()
      [source] invoke cancel()
      [source] invoke close()
      [source] invoke initializeState()
      [source] invoke open()
      [source] invoke run()
      [source] invoke snapshotState()
      [source] invoke cancel()
      [source] invoke close()
      ^C%
      

        Attachments

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              buom buom
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated: