Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-27272

The plan for query with local sort is incorrect if adaptive batch scheduler is enabled

    XMLWordPrintableJSON

Details

    Description

      Add the following test case in ForwardHashExchangeTest

        @Test
          public void testRankWithHashShuffle() {
              util.verifyExecPlan(
                      "SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM T) WHERE rk <= 10");
          }
      

      The result plan is:

      Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
      +- Exchange(distribution=[forward])
         +- Sort(orderBy=[a ASC, b ASC])
            +- Exchange(distribution=[hash[a]])
               +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[false], select=[a, b])
                  +- Sort(orderBy=[a ASC, b ASC])
                      +- TableSourceScan(table=[[default_catalog, default_database, T, project=[a, b], metadata=[]]], fields=[a, b])
      

      There should be an additional Exchange(distribution=[forward]) node between local Rank and Sort, other wise if adaptive batch scheduler is enabled but operator chain is disabled, the result may be wrong. Because the parallelism for local Rank and Sort should be same, otherwise the adaptive batch scheduler may change their parallelism.

      Local sort agg has the similar problem.

      Attachments

        Issue Links

          Activity

            People

              godfreyhe godfrey he
              godfreyhe godfrey he
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: