Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6260

Distinct Aggregates for Group By Windows

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: Closed
    • Major
    • Resolution: Duplicate
    • None
    • None
    • Table SQL / API

    Description

      Time target: ProcTime/EventTime

      SQL targeted query examples:
      ------------

      Q1. Boundaries are expressed in GROUP BY clause and distinct is applied for the elements of the aggregate(s)

      `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() TO HOUR)`

      Q2. Distinct is applied to the collection of outputs to be selected.

      `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY FLOOR(procTime() TO DAY)`

      => DISTINCT operation makes sense only within the context of windows or some bounded defined structures. Otherwise the operation would keep an infinite amount of data to ensure uniqueness and would not trigger for certain functions (e.g. aggregates)

      => We can follow the same design/implementation as for JIRA FLINK-6249 (supporting Distinct Aggregates for OVER Windows)

      => We can consider as a sub-JIRA issue the implementation of DISTINCT for select clauses.

      => Aggregations over distinct elements without any boundary (i.e. within SELECT clause) do not make sense just as aggregations do not make sense without groupings or windows.

      If distinct is applied as in Q1 example on group elements than either we define a new implementation if selection is general or extend the current implementation of grouped aggregates with distinct group aggregates

      If distinct is applied as in Q2 example for the select all elements, then a new implementation needs to be defined. This would work over a specific window / processFunction and within the processing function the uniqueness of the results to be processed will be done. This will happen for each partition. The data structure used to trace distinct elements will be reset with each new window (i.e., group by scope)

      Examples
      ------------
      `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) `

      `Q2: SELECT COUNT(DISTINCT b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO HOUR) `

      Proctime IngestionTime(Event) Stream1 Q1 Q2
      10:00:01 (ab,1)    
      10:05:00 (aa,2)    
      11:00:00   ab,aa 2
      11:03:00 (aa,2)    
      11:09:00 (aa,2    
      12:00:00   aa 1
      ...

      Implementation option
      ---------------------
      Considering that the behavior is similar as the one implemented for over window behavior (with the difference that the distinction is reset for each , group scope), the implementation will be done by reusing the existing implementation of the over window functions. Distinction can be achieved within the aggregate itself or within the window/processfunction logic that computes the aggregates. As multiple aggregates which require distinction can be computed in the same time, the preferred option is to create distinction within the process logic. For the case of selecting distinct outputs (i.e., not aggregates) we can follow the same implementation design: support distinction in the aggregation and than emitting only one output per each element saw (instead of calling aggregate method of the aggregates) .

      Attachments

        Activity

          People

            Unassigned Unassigned
            rtudoran radu
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: