Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33365

Missing filter condition in execution plan containing lookup join with mysql jdbc connector

    XMLWordPrintableJSON

Details

    Description

      create table in flink with sql-client.sh

      CREATE TABLE default_catalog.default_database.a (
        ip string, 
        proctime as proctime()
      ) 
      WITH (
        'connector' = 'datagen'
      );

      create table in mysql

      create table b (
        ip varchar(20), 
        type int
      );  

       

      Flink 1.17.1/ 1.18.0 and flink-connector-jdbc-3.1.1-1.17.jar

      excute in sql-client.sh 

      explain SELECT * FROM default_catalog.default_database.a left join bnpmp_mysql_test.gem_tmp.b FOR SYSTEM_TIME AS OF a.proctime on b.type = 0 and a.ip = b.ip; 

      get the execution plan

      ...
      == Optimized Execution Plan ==
      Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
      +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
         +- Calc(select=[ip, PROCTIME() AS proctime])
            +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip])

       
      excute same sql in sql-client with Flink 1.17.1/ 1.18.0 and flink-connector-jdbc-3.0.0-1.16.jar

      get the execution plan

      ...
      == Optimized Execution Plan ==
      Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type])
      +- LookupJoin(table=[bnpmp_mysql_test.gem_tmp.b], joinType=[LeftOuterJoin], lookup=[type=0, ip=ip], where=[(type = 0)], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, CAST(ip AS VARCHAR(2147483647)) AS ip0])
         +- Calc(select=[ip, PROCTIME() AS proctime])
            +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip]) 

      with flink-connector-jdbc-3.1.1-1.17.jar,  the condition is 

      lookup=[ip=ip]

      with flink-connector-jdbc-3.0.0-1.16.jar ,  the condition is 

      lookup=[type=0, ip=ip], where=[(type = 0)]

       

      In out real world production environment, this lead incorrect data output

       

       

      Attachments

        1. flink-connector-jdbc-3.1.1-1.17.png
          125 kB
          macdoor615
        2. flink-connector-jdbc-3.0.0-1.16.png
          209 kB
          macdoor615

        Issue Links

          Activity

            People

              davidradl david radley
              macdoor615 macdoor615
              Votes:
              0 Vote for this issue
              Watchers:
              13 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: