Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-6473

Add OVER window support for batch tables

    XMLWordPrintableJSON

Details

    Description

      Add support for OVER windows for batch tables.

      Since OVER windows are supported for streaming tables, this issue is not about the API (which is available) but about adding the execution strategies and translation for OVER windows on batch tables.

      The feature could be implemented using the following plans

      UNBOUNDED OVER

      DataSet[Row] input = ...
      DataSet[Row] result = input
        .groupBy(partitionKeys)
        .sortGroup(orderByKeys)
        .reduceGroup(computeAggregates)
      

      This implementation is quite straightforward because we don't need to retract rows.

      BOUNDED OVER

      A bit more challenging are BOUNDED OVER windows, because we need to retract values from aggregates and we don't want to store rows temporarily on the heap.

      DataSet[Row] input = ...
      DataSet[Row] sorted = input
        .partitionByHash(partitionKey)
        .sortPartition(partitionKeys, orderByKeys)
      DataSet[Row] result = sorted.coGroup(sorted)
        .where(partitionKey).equalTo(partitionKey)
        .with(computeAggregates)
      

      With this, the data set should be partitioned and sorted once. The sorted DataSet would be consumed twice (the optimizer should inject a temp barrier on one of the inputs to avoid a consumption deadlock). The CoGroupFunction would accumulate new rows into the aggregates from one input and retract them from the other. Since both input streams are properly sorted, this can happen in a zigzag fashion. We need verify that the generated plan is was we want it to be.

      Attachments

        Activity

          People

            Unassigned Unassigned
            fhueske Fabian Hueske
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated: