Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29119

Should clarify how join hints work with CTE

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Minor
    • Resolution: Unresolved
    • None
    • None
    • None
    • None

    Description

      use source tables of flink-tpch-test

      join hint on a single expression name of CTE works fine:

      
      Flink SQL> explain with q1 as (SELECT
      >   p_name,
      >   p_mfgr,
      >   p_brand,
      >   p_type,
      >   s_name,
      >   s_address
      > FROM
      >   part,
      >   supplier
      > WHERE p_partkey = s_suppkey)
      >
      > SELECT /*+ SHUFFLE_MERGE(part,supplier)  */ * from q1;
      == Abstract Syntax Tree ==
      LogicalProject(p_name=[$1], p_mfgr=[$2], p_brand=[$3], p_type=[$4], s_name=[$10], s_address=[$11])
      +- LogicalFilter(condition=[=($0, $9)])
         +- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0, 0] options:[part, supplier]]]])
            :- LogicalTableScan(table=[[default_catalog, default_database, part]], hints=[[[ALIAS inheritPath:[] options:[part]]]])
            +- LogicalTableScan(table=[[default_catalog, default_database, supplier]], hints=[[[ALIAS inheritPath:[] options:[supplier]]]])
      
      == Optimized Physical Plan ==
      Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
      +- SortMergeJoin(joinType=[InnerJoin], where=[=(p_partkey, s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address])
         :- Exchange(distribution=[hash[p_partkey]])
         :  +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
         +- Exchange(distribution=[hash[s_suppkey]])
            +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address])
      
      == Optimized Execution Plan ==
      Calc(select=[p_name, p_mfgr, p_brand, p_type, s_name, s_address])
      +- SortMergeJoin(joinType=[InnerJoin], where=[(p_partkey = s_suppkey)], select=[p_partkey, p_name, p_mfgr, p_brand, p_type, s_suppkey, s_name, s_address])
         :- Exchange(distribution=[hash[p_partkey]])
         :  +- TableSourceScan(table=[[default_catalog, default_database, part, project=[p_partkey, p_name, p_mfgr, p_brand, p_type], metadata=[]]], fields=[p_partkey, p_name, p_mfgr, p_brand, p_type])
         +- Exchange(distribution=[hash[s_suppkey]])
            +- TableSourceScan(table=[[default_catalog, default_database, supplier, project=[s_suppkey, s_name, s_address], metadata=[]]], fields=[s_suppkey, s_name, s_address])
      
      

      but raise an error when there co-exists an alias of the expression name

      
      Flink SQL> explain with q1 as (SELECT
      >   p_name,
      >   p_mfgr,
      >   p_brand,
      >   p_type,
      >   s_name,
      >   s_address
      > FROM
      >   part,
      >   supplier
      > WHERE p_partkey = s_suppkey)
      >
      > SELECT /*+ SHUFFLE_MERGE(part,supplier)  */ * from q1, q1 q2 where q1.p_name = q2.p_name;
      [ERROR] Could not execute SQL statement. Reason:
      org.apache.flink.table.api.ValidationException: The options of following hints cannot match the name of input tables or views:
      `SHUFFLE_MERGE(part, supplier)`
      
      

      The expected behavior with CTE should be clarified in the documentation.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              lincoln.86xy lincoln lee
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated: