Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-30006

Cannot remove columns that are incorrectly considered constants from an Aggregate In Streaming

    XMLWordPrintableJSON

Details

    Description

      In Streaming, columns generated by dynamic functions are incorrectly considered constants and removed from an Aggregate via optimization rule `CoreRules.AGGREGATE_PROJECT_PULL_UP_CONSTANTS` (inside the RelMdPredicates, it only considers the non-deterministic functions, but this doesn't applicable for streaming)

      an example query:

        @Test
        def testReduceGroupKey(): Unit = {
          util.tableEnv.executeSql("""
                                     |CREATE TABLE t1(
                                     | a int,
                                     | b varchar,
                                     | cat VARCHAR,
                                     | gmt_date DATE,
                                     | cnt BIGINT,
                                     | PRIMARY KEY (cat) NOT ENFORCED
                                     |) WITH (
                                     | 'connector' = 'values'
                                     |)
                                     |""".stripMargin)
          util.verifyExecPlan(s"""
                                 |SELECT
                                 |     cat, gmt_date, SUM(cnt), count(*)
                                 |FROM t1
                                 |WHERE gmt_date = current_date
                                 |GROUP BY cat, gmt_date
                                 |""".stripMargin)
        }
      

      the wrong plan:

      Calc(select=[cat, CAST(CURRENT_DATE() AS DATE) AS gmt_date, EXPR$2, EXPR$3])
      +- GroupAggregate(groupBy=[cat], select=[cat, SUM(cnt) AS EXPR$2, COUNT(*) AS EXPR$3])
         +- Exchange(distribution=[hash[cat]])
            +- Calc(select=[cat, cnt], where=[=(gmt_date, CURRENT_DATE())])
               +- TableSourceScan(table=[[default_catalog, default_database, t1, filter=[], project=[cat, cnt, gmt_date], metadata=[]]], fields=[cat, cnt, gmt_date])
      

      expect plan:

      GroupAggregate(groupBy=[cat, gmt_date], select=[cat, gmt_date, SUM(cnt) AS EXPR$2, COUNT(*) AS EXPR$3])
      +- Exchange(distribution=[hash[cat, gmt_date]])
         +- Calc(select=[cat, gmt_date, cnt], where=[(gmt_date = CURRENT_DATE())])
            +- TableSourceScan(table=[[default_catalog, default_database, t1, filter=[], project=[cat, gmt_date, cnt], metadata=[]]], fields=[cat, gmt_date, cnt])
      

      In addition to this issue, we need to check all optimization rules in streaming completely to avoid similar problems.

      Attachments

        Issue Links

          Activity

            People

              lincoln.86xy lincoln lee
              lincoln.86xy lincoln lee
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: