Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-22955

lookup join filter push down result to mismatch function signature

    XMLWordPrintableJSON

Details

    Description

      a sql like this may result to look function signature mismatch exception when explain sql

      CREATE TEMPORARY VIEW v_vvv AS
      SELECT * FROM MyTable AS T
      JOIN LookupTableAsync1 FOR SYSTEM_TIME AS OF T.proctime AS D
      ON T.a = D.id;
      
      SELECT a,b,id,name
      FROM v_vvv
      WHERE age = 10;

      the lookup function is

      class AsyncTableFunction1 extends AsyncTableFunction[RowData] {
        def eval(resultFuture: CompletableFuture[JCollection[RowData]], a: Integer): Unit = {
        }
      }

      exec plan is

      LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
      +- LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[age=10, id=a], where=[(age = 10)], select=[a, b, id, name])
         +- Calc(select=[a, b])
            +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
      

      the "lookup=[age=10, id=a]" result to mismatch signature mismatch

       

      but if I add 1 more insert, it works well

      SELECT a,b,id,name
      FROM v_vvv
      WHERE age = 30
      

      exec plan is

      == Optimized Execution Plan ==
      LookupJoin(table=[default_catalog.default_database.LookupTableAsync1], joinType=[InnerJoin], async=[true], lookup=[id=a], select=[a, b, c, proctime, rowtime, id, name, age, ts])(reuse_id=[1])
      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])LegacySink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b, id, name])
      +- Calc(select=[a, b, id, name], where=[(age = 10)])
         +- Reused(reference_id=[1])LegacySink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b, id, name])
      +- Calc(select=[a, b, id, name], where=[(age = 30)])
         +- Reused(reference_id=[1])
      
      

       the LookupJoin node use "lookup=[id=a]"(right) not "lookup=[age=10, id=a]" (wrong)

       

      so, in "multi insert" case, planner works great

      in "single insert" case, planner throw exception

      Attachments

        Activity

          People

            Unassigned Unassigned
            gsavl Cooper Luan
            Votes:
            0 Vote for this issue
            Watchers:
            8 Start watching this issue

            Dates

              Created:
              Updated: