Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-25396

lookupjoin source table for pre-partitioning

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • None
    • None
    • Table SQL / Planner
    • None

    Description

      When we perform external associations, we need to partition by key first, so that the same key is in a task, which can reduce the number of queries and make the external data cached by each task more scattered rather than full

      Example:select * from sourceTable t1 LEFT JOIN lookuptable FOR SYSTEM_TIME AS OF t1.proctime as t2 ON t1.msg = t2.word

      Execution Plan like:

      == Optimized Execution Plan ==
      Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) AS proctime, word])
      +- LookupJoin(table=[default_catalog.default_database.hbaselookup], joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, offset, rowtime, msg, uid, proctime, word])
         +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime])
            +- TableSourceScan(table=[[default_catalog, default_database, test, watermark=[-($0, 10000:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, offset])
      

      After I made the optimization, I added a hint configuration(lookup.join.pre-partition) and added a rule to generate an exchange. so that I can pre-partition by the join key when obtaining external data synchronously

      select * from test t1 LEFT JOIN hbaselookup /*+ OPTIONS('lookup.join.pre-partition'='true') */ FOR SYSTEM_TIME AS OF t1.proctime as t2 ON t1.msg = t2.word

      == Optimized Execution Plan ==
      Calc(select=[topic, offset, rowtime, msg, uid, PROCTIME_MATERIALIZE(proctime) AS proctime, word])
      +- LookupJoin(table=[default_catalog.default_database.hbaselookup], joinType=[LeftOuterJoin], async=[false], lookup=[word=msg], select=[topic, offset, rowtime, msg, uid, proctime, word])
         +- Exchange(distribution=[hash[msg]])
            +- Calc(select=[CAST(topic) AS topic, CAST(offset) AS offset, Reinterpret(rowtime) AS rowtime, msg, uid, PROCTIME() AS proctime])
               +- TableSourceScan(table=[[default_catalog, default_database, test, watermark=[-($0, 10000:INTERVAL SECOND)]]], fields=[rowtime, msg, uid, topic, offset])
       

       

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              HunterHunter HunterXHunter
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: