Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5582

Add a general distributive aggregate function

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Streaming
    • Labels:
      None

      Description

      The DataStream API currently has two aggregation functions that can be used on windows and in state, both of which have limitations:

      • ReduceFunction only supports one type as the type that is added and aggregated/returned.
      • FoldFunction Supports different types to add and return, but is not distributive, i.e. it cannot be used for hierarchical aggregation, for example to split the aggregation into to pre- and final-aggregation.

      I suggest to add a generic and powerful aggregation function that supports:

      • Different types to add, accumulate, and return
      • The ability to merge partial aggregated by merging the accumulated type.

      The proposed interface is below. This type of interface is found in many APIs, like that of various databases, and also in Apache Beam:

      • The accumulator is the state of the running aggregate
      • Accumulators can be merged
      • Values are added to the accumulator
      • Getting the result from the accumulator perform an optional finalizing operation
      public interface AggregateFunction<IN, ACC, OUT> extends Function {
      
      	ACC createAccumulator();
      
      	void add(IN value, ACC accumulator);
      
      	OUT getResult(ACC accumulator);
      
      	ACC merge(ACC a, ACC b);
      }
      

      Example use:

      public class AverageAccumulator {
          long count;
          long sum;
      }
      
      // implementation of a simple average
      public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
      
          public AverageAccumulator createAccumulator() {
              return new AverageAccumulator();
          }
      
          public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
              a.count += b.count;
              a.sum += b.sum;
              return a;
          }
      
          public void add(Integer value, AverageAccumulator acc) {
              acc.sum += value;
              acc.count++;
          }
      
          public Double getResult(AverageAccumulator acc) {
              return acc.sum / (double) acc.count;
          }
      }
      
      // implementation of a weighted average
      // this reuses the same accumulator type as the aggregate function for 'average'
      public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
      
          public AverageAccumulator createAccumulator() {
              return new AverageAccumulator();
          }
      
          public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
              a.count += b.count;
              a.sum += b.sum;
              return a;
          }
      
          public void add(Datum value, AverageAccumulator acc) {
              acc.count += value.getWeight();
              acc.sum += value.getValue();
          }
      
          public Double getResult(AverageAccumulator acc) {
              return acc.sum / (double) acc.count;
          }
      }
      

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                StephanEwen Stephan Ewen
                Reporter:
                StephanEwen Stephan Ewen
              • Votes:
                0 Vote for this issue
                Watchers:
                5 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: