Details
-
Bug
-
Status: Open
-
Not a Priority
-
Resolution: Unresolved
-
1.11.1, 1.12.0
-
None
Description
When using assignTimestampAndWatermarks function separately, if it can satisfied the chaining condition, it will chained erroneously with previous operator.
For example, I have a code like bellow:
// code placeholder
DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1);
ds.assignTimestampAndWatermarks(xxxx);
DataStream<UserAction> rsStream = ds.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2);
I will get the JobGraph bellow.
If I change my code to this:
// code placeholder
DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1);
DataStream<UserAction> watermarkStream = ds.assignTimestampAndWatermarks(xxxx);
DataStream<UserAction> rsStream = watermarkStream.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2);
I will get the same JobGraph, at the same time the actual execution result is not the same.
I think the first JobGraph have some problem and should not show like that, maybe the Timestamps/Watermarks operator should show separately and should not chaining with previous operator.