Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.9.0, 1.10.0
Description
@Test def testAllEventTimeTumblingGroupWindowOverTime(): Unit = { val util = streamTestUtil() val table = util.addDataStream[(Long, Int, String)]( "T1", 'long, 'int, 'string, 'rowtime.rowtime) val windowedTable = table .window(Tumble over 5.millis on 'rowtime as 'w) .groupBy('w) .select('int.count) util.verifyPlan(windowedTable) }
currently, it's physical plan is
HashWindowAggregate(window=[TumblingGroupWindow], select=[Final_COUNT(count$0) AS EXPR$0]) +- Exchange(distribution=[single]) +- LocalHashWindowAggregate(window=[TumblingGroupWindow], select=[Partial_COUNT(int) AS count$0]) +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
we know nothing about the TumblingGroupWindow except its name. the expected plan is
HashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)], select=[Final_COUNT(count$0) AS EXPR$0]) +- Exchange(distribution=[single]) +- LocalHashWindowAggregate(window=[TumblingGroupWindow('w, long, 5)], select=[Partial_COUNT(int) AS count$0]) +- TableSourceScan(table=[[default_catalog, default_database, Table1, source: [TestTableSource(long, int, string)]]], fields=[long, int, string])
Attachments
Issue Links
- links to