Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31632

watermark aligned idle source can't resume

    XMLWordPrintableJSON

Details

    Description

       

      WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
              .<String>forBoundedOutOfOrderness(Duration.ofMillis(0))
              .withTimestampAssigner((element, recordTimestamp) -> Long.parseLong(element))
              .withWatermarkAlignment("group", Duration.ofMillis(10), Duration.ofSeconds(2))
              .withIdleness(Duration.ofSeconds(10)); 
      DataStreamSource<String> s1 = env.fromSource(kafkaSource("topic1"), watermarkStrategy, "S1");
      DataStreamSource<String> s2 = env.fromSource(kafkaSource("topic2"), watermarkStrategy, "S2");

      send "0" to kafka topic1 and topic2

       

      After 10s, source1 and source2 is idle,and logs are

       

      09:44:30,403 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
      09:44:30,404 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
      09:44:32,019 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=9 to subTaskIds=[0]
      09:44:32,019 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=9 to subTaskIds=[0]
      09:44:32,417 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
      09:44:32,418 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ -1 (1970-01-01 07:59:59.999) from subTaskId=0
      09:44:34,028 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=9 to subTaskIds=[0]
      09:44:34,028 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=9 to subTaskIds=[0]
      09:44:34,423 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from subTaskId=0
      09:44:34,424 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from subTaskId=0
      09:44:36,023 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0]
      09:44:36,023 INFO  org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854775799 to subTaskIds=[0]
      09:44:36,433 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from subTaskId=0
      09:44:36,433 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 15:12:55.807) from subTaskId=0 

      send message to topic1 or topic2 now, the message can't be consumed。

       

      the reason is: 

      when a source is marked idle, the lastEmittedWatermark = Long.MAX_VALUE and 
      currentMaxDesiredWatermark = Long.MAX_VALUE + maxAllowedWatermarkDrift in org.apache.flink.streaming.api.operators.SourceOperator.
      currentMaxDesiredWatermark is negative and always less than lastEmittedWatermark
      operatingMode always is WAITING_FOR_ALIGNMENT

      Attachments

        Activity

          People

            haishui haishui
            haishui haishui
            Votes:
            0 Vote for this issue
            Watchers:
            11 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: