Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21165

Timestamps/Watermarks chained erroneously with previous operator

    XMLWordPrintableJSON

Details

    Description

      When using assignTimestampAndWatermarks function separately, if it can satisfied the chaining condition, it will chained erroneously with previous operator.

      For example, I have a code like bellow: 

      // code placeholder
      DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1);
      
      ds.assignTimestampAndWatermarks(xxxx);
      
      DataStream<UserAction> rsStream = ds.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2);

      I will get the JobGraph bellow.

      If I change my code to this:

      // code placeholder
      DataStream<UserAction> ds = dataSource.map(xxx).setParallelism(1); 
      
      DataStream<UserAction> watermarkStream = ds.assignTimestampAndWatermarks(xxxx); 
      
      DataStream<UserAction> rsStream = watermarkStream.keyBy(xxx).timeWindow(xxx).sum(xxx).setParallelism(2);
      

      I will get the same JobGraph, at the same time the actual execution result is not the same.

      I think the first JobGraph have some problem and should not show like that, maybe the Timestamps/Watermarks operator should show separately and should not chaining with previous operator.

      Attachments

        1. Lark20210127-203541.png
          13 kB
          zlzhang0122

        Activity

          People

            Unassigned Unassigned
            zlzhang0122 zlzhang0122
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: