Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-29138

Project pushdown not work for lookup source

Agile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      Current tests: LookupJoinTest#testJoinTemporalTableWithProjectionPushDown

      @Test
      def testJoinTemporalTableWithProjectionPushDown(): Unit = {
      val sql =
      """
      |SELECT T.*, D.id
      |FROM MyTable AS T
      |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D
      |ON T.a = D.id
      """.stripMargin
      
      util.verifyExecPlan(sql)
      }
      
      

      the optimized plan doesn't print the selected columns from lookup source, but actually it didn't push the project into lookup source (still select all columns from source), this is not as expected

      <Resource name="optimized exec plan">
      <![CDATA[
      Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
      +- LookupJoin(table=[default_catalog.default_database.LookupTable], joinType=[InnerJoin], lookup=[id=a], select=[a, b, c, proctime, rowtime, id])
      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
      ]]>
      </Resource>
      
      

       

      incorrect intermediate optimization result

      =========  logical_rewrite ========
       optimize result: 
      FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
      :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
      +- FlinkLogicalSnapshot(period=[$cor0.proctime])
         +- FlinkLogicalCalc(select=[id])
            +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
      
      
      =========  time_indicator ========
       optimize result: 
      FlinkLogicalCalc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
      +- FlinkLogicalJoin(condition=[=($0, $5)], joinType=[inner])
         :- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
         +- FlinkLogicalSnapshot(period=[$cor0.proctime])
            +- FlinkLogicalCalc(select=[id])
               +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
      
      

       

      plan comparison after fix

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            lincoln.86xy lincoln lee
            lincoln.86xy lincoln lee
            Votes:
            0 Vote for this issue
            Watchers:
            7 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment