Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-15577

WindowAggregate RelNodes missing Window specs in digest

    XMLWordPrintableJSON

Details

    Description

      The RelNode's digest (AbstractRelNode.getDigest()), along with its RowType, is used by the Calcite HepPlanner to avoid adding duplicate Vertices to the graph. If an equivalent vertex is already present in the graph, then that vertex is used in place of the newly generated one:
      https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828

      This means that the digest needs to contain all the information necessary to identify a vertex and distinguish it from similar - but not equivalent - vertices.

      In the case of `LogicalWindowAggregation` and `FlinkLogicalWindowAggregation`, the window specs are currently not in the digest, meaning that two aggregations with the same signatures and expressions but different windows are considered equivalent by the planner, which is not correct and will lead to an invalid Physical Plan.

      For instance, the following query would give an invalid plan:

      WITH window_1h AS (
          SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as `timestamp`
          FROM my_table
          GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
      ),
      window_2h AS (
          SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as `timestamp`
          FROM my_table
          GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
      )
      (SELECT * FROM window_1h)
      UNION ALL
      (SELECT * FROM window_2h)
      

      The invalid plan generated by the planner is the following (Please note the windows in the two DataStreamGroupWindowAggregates nodes being the same when they should be different):

      DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0, cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176
        DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173
          DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172
            DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
        DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175
          DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174
            DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
      

      Attachments

        Issue Links

          Activity

            People

              b.hanotte Benoit Hanotte
              b.hanotte Benoit Hanotte
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0h
                  0h
                  Logged:
                  Time Spent - 20m
                  20m