Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
Description
As raised in this thread, the query created by the following code will throw a calcite "mismatch type" (Timestamp(3) and TimeIndicator) exception.
String sql1 = "select id, eventTs as t1, count(*) over (partition by id order by eventTs rows between 100 preceding and current row) as cnt1 from myTable1"; String sql2 = "select distinct id as r_id, eventTs as t2, count(*) over (partition by id order by eventTs rows between 50 preceding and current row) as cnt2 from myTable2"; Table left = tableEnv.sqlQuery(sql1); Table right = tableEnv.sqlQuery(sql2); left.join(right).where("id === r_id && t1 === t2").select("id, t1").writeToSink(...)
The logical plan is as follows.
LogicalProject(id=[$0], t1=[$1]) LogicalFilter(condition=[AND(=($0, $3), =($1, $4))]) LogicalJoin(condition=[true], joinType=[inner]) LogicalAggregate(group=[{0, 1, 2}]) LogicalWindow(window#0=[window(partition {0} order by [1] rows between $2 PRECEDING and CURRENT ROW aggs [COUNT()])]) LogicalProject(id=[$0], eventTs=[$3]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalAggregate(group=[{0, 1, 2}]) LogicalWindow(window#0=[window(partition {0} order by [1] rows between $2 PRECEDING and CURRENT ROW aggs [COUNT()])]) LogicalProject(id=[$0], eventTs=[$3]) LogicalTableScan(table=[[_DataStreamTable_0]])
That is because the the rowtime field after an aggregation will be materialized while the RexInputRef type for the filter's operands (t1 === t2) is still TimeIndicator. We should make them unified.
Attachments
Issue Links
- relates to
-
FLINK-10210 Time indicators are not always materialised for LogicalCorrelate
- Open
-
FLINK-10211 Time indicators are not correctly materialized for LogicalJoin
- Closed
- links to