Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-16824 FLIP-132 Temporal Table DDL and Temporal Table Join
  3. FLINK-18548

Support temporal join on temporal table with computed columns

    XMLWordPrintableJSON

Details

    Description

      The reason why Flink does not support computed column in dimension(temporal) table is that calcite has limitation with Snapshot node, the computed column is a Project node upon TableScan which will hit the limitation.

      We can improve calcite to 1.23 to fix this or override Sql2RelConverter to fix this issue.

       

      In Flink 1.10, we bring computed column feature, but I found dimension table do not support this feature.

      public static final String mysqlCurrencyDDL = "CREATE TABLE currency (\n" +
      " currency_id BIGINT,\n" +
      " currency_name STRING,\n" +
      " rate DECIMAL(38, 4),\n" +
      " currency_time TIMESTAMP(3),\n" +
      " country STRING,\n" +
      " timestamp6 TIMESTAMP(6),\n" +
      " currency_next as currency_id + 1,\n" +
      " time6 TIME(6),\n" +
      " gdp DECIMAL(10, 4)\n" +
      ") WITH (\n" +
      " 'connector.type' = 'jdbc',\n" +
      " 'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
      " 'connector.username' = 'root'," +
      " 'connector.table' = 'currency',\n" +
      " 'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
      " 'connector.lookup.cache.max-rows' = '500', \n" +
      " 'connector.lookup.cache.ttl' = '10s',\n" +
      " 'connector.lookup.max-retries' = '3'" +
      ")";

       

      //

      //
      Exception in thread "main" java.lang.ClassCastException: org.apache.calcite.rel.logical.LogicalProject cannot be cast to org.apache.calcite.rel.core.TableScanException in thread "main" java.lang.ClassCastException: org.apache.calcite.rel.logical.LogicalProject cannot be cast to org.apache.calcite.rel.core.TableScan at org.apache.calcite.sql2rel.SqlToRelConverter.snapshotTemporalTable(SqlToRelConverter.java:2438) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2062) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2085) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:148) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:523) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:437) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:343) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at job.KafkaJoinJdbc2Jdbc.main(KafkaJoinJdbc2Jdbc.java:59)
      Process finished with exit code 1

       

      Attachments

        Issue Links

          Activity

            People

              leonard Leonard Xu
              leonard Leonard Xu
              Votes:
              0 Vote for this issue
              Watchers:
              8 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: