Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-34665

Add streaming rule for union to Rand and it convert to StreamExecDeduplicate finally

    XMLWordPrintableJSON

Details

    • Improvement
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 1.20.0
    • 1.20.0
    • Table SQL / Planner
    • None

    Description

      The semantics of a union in SQL involves deduplication, and in Calcite, when converting a SQL node to a RelNode, a Distinct Aggregate is inserted above the Union to achieve this deduplication. In Flink, the Distinct Aggregate eventually gets converted into a StreamExecGroupAggregate operator. This operator accesses the state multiple times, and from our observations of numerous jobs, we can see that the stack often gets stuck at state access. This is because the key for the distinct aggregate is all the fields of the union, meaning that for the state, the key will be relatively large, and repeated access and comparisons to the state can be time-consuming.

      In fact, a potential optimization is to add a rule to convert the Union into a Rank with processing time, which then ultimately gets converted into a StreamExecDeduplicate. Currently, we have users rewrite their SQL to use Row_number for deduplication, and this approach works very well. Therefore, it is possible to add a rule at the engine level to support this optimization.

       

      and it will break the change of plan, it will cause user upgrade flink version failed. so i suggest add a flag.default value is not change the behavior

      Attachments

        Activity

          People

            Unassigned Unassigned
            jackylau Jacky Lau
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: