Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34910

Can not plan window join without projections

    XMLWordPrintableJSON

Details

    Description

      When running:

        @Test
        def testWindowJoinWithoutProjections(): Unit = {
          val sql =
            """
              |SELECT *
              |FROM
              |  TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) AS L
              |JOIN
              |  TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) AS R
              |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a
            """.stripMargin
          util.verifyRelPlan(sql)
        }
      

      It fails with:

      FlinkLogicalCalc(select=[a, b, c, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time, a0, b0, c0, rowtime0, PROCTIME_MATERIALIZE(proctime0) AS proctime0, window_start0, window_end0, window_time0])
      +- FlinkLogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{}])
         :- FlinkLogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR($3), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
         :  +- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
         :     +- FlinkLogicalCalc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
         :        +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime])
         +- FlinkLogicalTableFunctionScan(invocation=[TUMBLE(DESCRIPTOR(CAST($3):TIMESTAMP(3)), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
            +- FlinkLogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)])
               +- FlinkLogicalCalc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
                  +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime])
      
      Failed to get time attribute index from DESCRIPTOR(CAST($3):TIMESTAMP(3)). This is a bug, please file a JIRA issue.
      Please check the documentation for the set of currently supported SQL features.
      

      In prior versions this had another problem of ambiguous rowtime column, but this has been fixed by FLINK-32648. In versions < 1.19 WindowTableFunctions were incorrectly scoped, because they were not extending from Calcite's SqlWindowTableFunction and the scoping implemented in SqlValidatorImpl#convertFrom was incorrect.

      Attachments

        Issue Links

          Activity

            People

              dwysakowicz Dawid Wysakowicz
              dwysakowicz Dawid Wysakowicz
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: