Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-10734

Temporal joins on heavily filtered tables might fail in planning

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • 1.7.0
    • None
    • Table SQL / Planner
    • None

    Description

      Following query:

          val sqlQuery =
            """
              |SELECT
              |  o.amount * r.rate AS amount
              |FROM
              |  Orders AS o,
              |  LATERAL TABLE (Rates(o.rowtime)) AS r
              |WHERE r.currency = o.currency
              |""".stripMargin
      

      with Rates defined as follows:

          tEnv.registerTable("EuroRatesHistory", tEnv.scan("RatesHistory").filter('currency === "Euro"))
          tEnv.registerFunction(
            "Rates",
            tEnv.scan("EuroRatesHistory").createTemporalTableFunction('rowtime, 'currency))
      

      Will fail with:

      org.apache.flink.table.api.ValidationException: Only single column join key is supported. Found [] in [InnerJoin(where: (__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, currency)), join: (amount, rowtime, currency, rate, rowtime0))]
      
       at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.validateRightPrimaryKey(DataStreamTemporalJoinToCoProcessTranslator.scala:215)
       at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:183)
       at org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalJoinToCoProcessTranslator$TemporalJoinConditionExtractor.visitCall(DataStreamTemporalJoinToCoProcessTranslator.scala:152)
      
      

      The problem is that filtering condition ('currency === "Euro") interferes with joining condition, simplifying it to nothing. Note how top LogicalFilter(condition=[=($3, $1)]) changes during optimising and finally disappears:

      LogicalProject(amount=[*($0, $4)])
        LogicalFilter(condition=[=($3, $1)])
          LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, $3)], joinType=[inner])
            LogicalTableScan(table=[[_DataStreamTable_0]])
            LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
              LogicalTableScan(table=[[_DataStreamTable_1]])
      
      LogicalProject(amount=[*($0, $4)])
        LogicalFilter(condition=[=(_UTF-16LE'Euro', $1)])
          LogicalProject(amount=[$0], currency=[$1], rowtime=[$2], currency0=[$3], rate=[$4], rowtime0=[CAST($5):TIMESTAMP(3) NOT NULL])
            LogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($2, $5, $3)], joinType=[inner])
              LogicalTableScan(table=[[_DataStreamTable_0]])
              LogicalFilter(condition=[=($0, _UTF-16LE'Euro')])
                LogicalTableScan(table=[[_DataStreamTable_1]])
      
      FlinkLogicalCalc(expr#0..4=[{inputs}], expr#5=[*($t0, $t3)], amount=[$t5])
        FlinkLogicalTemporalTableJoin(condition=[__TEMPORAL_JOIN_CONDITION($1, $4, $2)], joinType=[inner])
          FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], expr#4=[=($t3, $t1)], amount=[$t0], rowtime=[$t2], $condition=[$t4])
            FlinkLogicalNativeTableScan(table=[[_DataStreamTable_0]])
          FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[_UTF-16LE'Euro'], expr#4=[=($t0, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
            FlinkLogicalNativeTableScan(table=[[_DataStreamTable_1]])
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            pnowojski Piotr Nowojski
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated: