Details
-
Sub-task
-
Status: Closed
-
Major
-
Resolution: Fixed
-
None
Description
@Test def testTumble_ProjectionPushDown(): Unit = { // TODO: [b, c, e, proctime] are never used, should be pruned val sql = """ |SELECT | a, | window_start, | window_end, | count(*), | sum(d) |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) |GROUP BY a, window_start, window_end """.stripMargin util.verifyRelPlan(sql) }
For the above test, currently we get the following plan:
Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
+- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, d, rowtime])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime])
It should be able to prune fields and get the following plan:
Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4])
+- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, start('w$) AS window_start, end('w$) AS window_end])
+- Exchange(distribution=[hash[a]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, d, rowtime])
The reason is we didn't transpose Project and WindowTableFunction in logical phase.
LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)]) +- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3]) +- LogicalTableFunctionScan(invocation=[TUMBLE($6, DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
Attachments
Issue Links
- links to