Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-32828

Partition aware watermark not handled correctly shortly after job start up from checkpoint or savepoint

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Critical
    • Resolution: Fixed
    • 1.17.1, 1.19.0, 1.18.1
    • 1.18.2, 1.20.0, 1.19.1

    Description

      When using KafkaSource with partition aware watermarks. Watermarks are being emitted even when only one partition has some events just after job startup from savepoint/checkpoint. After it has some events on other partitions the watermark behaviour is correct and watermark is emited as a minimum watarmark from all partition.

       

      Steps to reproduce:

      1. Setup a Kafka cluster with a topic that has 2 or more partitions. (see attached docker-compose.yml)
        1. ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-2 --partitions 4
      1. Create a job that (see attached `test-job.java`):
        1. uses a KafkaSource with `WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L)`
        2. has parallelism lower than number of partitions
        3. stores checkpoint/savepoint
      2. Start job
      3. Send events only on single partition
        1. ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-2 --property "parse.key=true" --property "key.separator=:"

       

      14:51:19,883 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:18.849Z, watermark -292275055-05-16T16:47:04.192Z
      14:51:32,484 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:31.475Z, watermark -292275055-05-16T16:47:04.192Z
      14:51:35,914 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:51:34.909Z, watermark -292275055-05-16T16:47:04.192Z

      Expected: Watermark does not progress. Actual: Watermark does not progress.

      5. Stop the job

      6. Startup job from last checkpoint/savepoint

      7. Send events only on single partitions

      14:53:41,693 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:40.662Z, watermark -292275055-05-16T16:47:04.192Z
      14:53:46,088 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:45.078Z, watermark 2023-08-10T12:53:30.661Z
      14:53:49,520 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:53:48.511Z, watermark 2023-08-10T12:53:35.077Z

      Expected: Watermark does not progress. Actual: Watermark has progress

       

      To add bit more of context:

      8. Send events on other partitions and then send events only on single partitions

      14:54:55,112 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/0: 2 -> a, timestamp 2023-08-10T12:54:54.104Z, watermark 2023-08-10T12:53:38.510Z
      14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/1: 4 -> a, timestamp 2023-08-10T12:54:56.665Z, watermark 2023-08-10T12:53:38.510Z
      14:54:57,673 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/2: 5 -> a, timestamp 2023-08-10T12:54:57.554Z, watermark 2023-08-10T12:53:38.510Z
      14:55:12,821 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:11.814Z, watermark 2023-08-10T12:54:44.103Z
      14:55:16,099 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:15.091Z, watermark 2023-08-10T12:54:44.103Z
      14:55:19,122 WARN  com.example.TestJob6$InputSink2                   [] - == Received: test-2/3: 1 -> a, timestamp 2023-08-10T12:55:18.114Z, watermark 2023-08-10T12:54:44.103Z

      Expected: Watermark should progress a bit and then should not progress when receiving events only on single partition.

      Actual: As expected

       

       

      This behavior also shows as a burst of late events just after startup and then no more late events when job operates normally.

      Attachments

        1. docker-compose.yml
          0.8 kB
          Grzegorz Liter
        2. test-job.java
          5 kB
          Grzegorz Liter

        Activity

          People

            pnowojski Piotr Nowojski
            gliter Grzegorz Liter
            Votes:
            5 Vote for this issue
            Watchers:
            9 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: