Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.2.0, 1.3.0
-
None
Description
In this example job we don't get any watermarks in the WatermarkObserver:
public class WatermarkTest { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.getConfig().setAutoWatermarkInterval(1000); env.setParallelism(1); DataStreamSource<String> input = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> ctx) throws Exception { while (true) { ctx.collect("hello!"); Thread.sleep(800); } } @Override public void cancel() { } }); input.union(input) .flatMap(new IdentityFlatMap()) .transform("WatermarkOp", BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver()); env.execute(); } public static class WatermarkObserver extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> { @Override public void processElement(StreamRecord<String> element) throws Exception { System.out.println("GOT ELEMENT: " + element); } @Override public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); System.out.println("GOT WATERMARK: " + mark); } } private static class IdentityFlatMap extends RichFlatMapFunction<String, String> { @Override public void flatMap(String value, Collector<String> out) throws Exception { out.collect(value); } } }
When commenting out the `union` it works.