Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-14946

Retraction infer would result in bad plan under corner case in blink planner

    XMLWordPrintableJSON

Details

    Description

      Retractions rule would result in bad plan under some case, I simplify the case like the following sql, complete test case could be found in attachments.

        val join_sql =
            """
              |SELECT
              |  ll.a AS a,
              |  ll.b AS b,
              |  cnt
              |FROM (
              | SELECT a, b, COUNT(c) AS cnt FROM l GROUP BY a, b
              |) ll
              |JOIN (
              | SELECT a, b FROM r GROUP BY a, b
              |) rr ON
              |(ll.a = rr.a AND ll.b = rr.b)
            """.stripMargin !image-2019-11-26-14-52-52-824.png! 
      
          val sqlQuery =
            s"""
               |SELECT a, b_1, SUM(cnt) AS cnt
               |FROM (
               | SELECT *, b AS b_1 FROM (${join_sql})
               |   UNION ALL
               | SELECT *, 'SEA' AS b_1 FROM (${join_sql})
               |) AS total_result
               |GROUP BY a, b_1
            """.stripMargin
      

      The plan is :

      After retraction infer, we expect two join node in the above plan has `AccRetract` asAccMode. However, AccMode of Join1 is right, accMode of Join2 is unexpected.

      I find the `SetAccModeRule` never apply to Join2 because before actually apply `SetAccModeRule` to Join2, HepPlanner would check if the vertex belongs to DAG or not, and the result is false. So HepPlanner will not apply `SetAccModeRule` to Join2.


      Here is detailed follow-up process:
      1. Join2 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join node (called Join2') with new children which has UpdateAsRetractionTrait with true flag
      2. New right child of Join2, which is Exchange, matches `SetUpdatesAsRetractionRule`, results in an equivalent Exchange node (called Exchange-right') with new inputs which has UpdateAsRetractionTrait with true flag
      3. New left child of Join2 matches `SetUpdatesAsRetractionRule`, similar as step2, generate an equivalent node called (called Exchange-left')
      4. Join1 matches `SetUpdatesAsRetractionRule`, results in an equivalent Join node (called Join1'), similar as step1
      5. New right child of Join1, which is Exchange, matches `SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with new inputs which has UpdateAsRetractionTrait with true flag, however, HepPlanner find digest of new Exchange is same as that Exchange-right' in step2, HepPlanner will not create new vertex, but reuse the vertex which contain Exchange-right'
      6. New left child of Join1, which is Exchange, matches `SetUpdatesAsRetractionRule`, results in an equivalent Exchange node with new inputs which has UpdateAsRetractionTrait with true flag, however, HepPlanner find digest of new Exchange is same as that Exchange-left' in step3, HepPlanner will not create new vertex, but reuse the vertex which contain Exchange-left'. Besides, HepPlanner would replace inputs of parent of Exchange, (namely Join1) from old Exchange to new Exchange in `contractVertices` methods.


      In `updateVertex`, Join1' would put newKey and the vertex to `mapDigestToVertex`. However Join1' digest is exactly same as Join2' digest. Besides, `mapDigestToVertex` already contains same key with Vertex contains Join2'. So the operation would replace the value of newKey in `mapDigestToVertex` from Join2' to Join1'
      7. Join1' matches `SetAccModeRule`, results in an equivalent Join node (called Join1'') with AccRetract as AccMode. After apply the rule, HepPlanner starts collectGarbage, Join1' would be added in sweepSet because it's not reachable from root, so the entry related to Join1' would be removed in `mapDigestToVertex`.



      8. Join2' matches `SetAccModeRule`, however HepPlanner think Join2' does not belong to DAG because `mapDigestToVertex` does not contain the key of Join2' .


      So Maybe there could be two suggestion
      1. Root cause is drawback in HepPlanner. In `collectGarbage`, we could not simply remove key from `mapDigestToVertex` for nodes which are not reachable from root. Maybe we could check whether the key is same as that which is reachable from root.
      2. We could also avoid the bug by update `HepMatchOrder` of HepPlanner which contains `SetUpdatesAsRetractionRule` from `BOTTOM_UP` to `TOP_DOWN`.

      I'm not sure two above solution is reasonable. or is there better solution?

      Attachments

        1. screenshot-6.png
          354 kB
          Jing Zhang
        2. screenshot-5.png
          496 kB
          Jing Zhang
        3. screenshot-4.png
          272 kB
          Jing Zhang
        4. screenshot-3.png
          400 kB
          Jing Zhang
        5. screenshot-2.png
          319 kB
          Jing Zhang
        6. screenshot-1.png
          486 kB
          Jing Zhang
        7. RetractionRules1Test.scala
          2 kB
          Jing Zhang
        8. image-2019-11-26-14-54-34-797.png
          858 kB
          Jing Zhang

        Issue Links

          Activity

            People

              Unassigned Unassigned
              jingzhang Jing Zhang
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

                Created:
                Updated: