XMLWordPrintableJSON

Details

    Description

        @Test
        def testTumble_ProjectionPushDown(): Unit = {
          // TODO: [b, c, e, proctime] are never used, should be pruned
          val sql =
            """
              |SELECT
              |   a,
              |   window_start,
              |   window_end,
              |   count(*),
              |   sum(d)
              |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
              |GROUP BY a, window_start, window_end
            """.stripMargin
          util.verifyRelPlan(sql)
        }
      

      For the above test, currently we get the following plan:

      Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
      +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end])
         +- Exchange(distribution=[hash[a]])
            +- Calc(select=[a, d, rowtime])
               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
                  +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
      

      It should be able to prune fields and get the following plan:

      Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
      +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end])
         +- Exchange(distribution=[hash[a]])
               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
                     +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, d, rowtime])
      

      The reason is we didn't transpose Project and WindowTableFunction in logical phase.

      LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)])
      +- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3])
         +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
            +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6])
               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)])
                  +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()])
                     +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
      

      Attachments

        Issue Links

          Activity

            People

              jingzhang Jing Zhang
              jark Jark Wu
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: