Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6116

Watermarks don't work when unioning with same DataStream

    XMLWordPrintableJSON

Details

    Description

      In this example job we don't get any watermarks in the WatermarkObserver:

      public class WatermarkTest {
      	public static void main(String[] args) throws Exception {
      
      		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
      		env.getConfig().setAutoWatermarkInterval(1000);
      		env.setParallelism(1);
      
      		DataStreamSource<String> input = env.addSource(new SourceFunction<String>() {
      			@Override
      			public void run(SourceContext<String> ctx) throws Exception {
      				while (true) {
      					ctx.collect("hello!");
      					Thread.sleep(800);
      				}
      			}
      
      			@Override
      			public void cancel() {
      
      			}
      		});
      
      		input.union(input)
      				.flatMap(new IdentityFlatMap())
      				.transform("WatermarkOp", BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
      
      		env.execute();
      	}
      
      	public static class WatermarkObserver
      			extends AbstractStreamOperator<String>
      			implements OneInputStreamOperator<String, String> {
      		@Override
      		public void processElement(StreamRecord<String> element) throws Exception {
      			System.out.println("GOT ELEMENT: " + element);
      		}
      
      		@Override
      		public void processWatermark(Watermark mark) throws Exception {
      			super.processWatermark(mark);
      
      			System.out.println("GOT WATERMARK: " + mark);
      		}
      	}
      
      	private static class IdentityFlatMap
      			extends RichFlatMapFunction<String, String> {
      		@Override
      		public void flatMap(String value, Collector<String> out) throws Exception {
      			out.collect(value);
      		}
      	}
      }
      

      When commenting out the `union` it works.

      Attachments

        Activity

          People

            Unassigned Unassigned
            aljoscha Aljoscha Krettek
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 10m
                10m