Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5582

Add a general distributive aggregate function

    Details

    • Type: New Feature
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Streaming
    • Labels:
      None

      Description

      The DataStream API currently has two aggregation functions that can be used on windows and in state, both of which have limitations:

      • ReduceFunction only supports one type as the type that is added and aggregated/returned.
      • FoldFunction Supports different types to add and return, but is not distributive, i.e. it cannot be used for hierarchical aggregation, for example to split the aggregation into to pre- and final-aggregation.

      I suggest to add a generic and powerful aggregation function that supports:

      • Different types to add, accumulate, and return
      • The ability to merge partial aggregated by merging the accumulated type.

      The proposed interface is below. This type of interface is found in many APIs, like that of various databases, and also in Apache Beam:

      • The accumulator is the state of the running aggregate
      • Accumulators can be merged
      • Values are added to the accumulator
      • Getting the result from the accumulator perform an optional finalizing operation
      public interface AggregateFunction<IN, ACC, OUT> extends Function {
      
      	ACC createAccumulator();
      
      	void add(IN value, ACC accumulator);
      
      	OUT getResult(ACC accumulator);
      
      	ACC merge(ACC a, ACC b);
      }
      

      Example use:

      public class AverageAccumulator {
          long count;
          long sum;
      }
      
      // implementation of a simple average
      public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
      
          public AverageAccumulator createAccumulator() {
              return new AverageAccumulator();
          }
      
          public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
              a.count += b.count;
              a.sum += b.sum;
              return a;
          }
      
          public void add(Integer value, AverageAccumulator acc) {
              acc.sum += value;
              acc.count++;
          }
      
          public Double getResult(AverageAccumulator acc) {
              return acc.sum / (double) acc.count;
          }
      }
      
      // implementation of a weighted average
      // this reuses the same accumulator type as the aggregate function for 'average'
      public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
      
          public AverageAccumulator createAccumulator() {
              return new AverageAccumulator();
          }
      
          public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
              a.count += b.count;
              a.sum += b.sum;
              return a;
          }
      
          public void add(Datum value, AverageAccumulator acc) {
              acc.count += value.getWeight();
              acc.sum += value.getValue();
          }
      
          public Double getResult(AverageAccumulator acc) {
              return acc.sum / (double) acc.count;
          }
      }
      

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Thanks for the proposal! This will be a great feature for the DataStream API and be very helpful to for efficient aggregations.

          Show
          fhueske Fabian Hueske added a comment - Thanks for the proposal! This will be a great feature for the DataStream API and be very helpful to for efficient aggregations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3186#discussion_r97226914

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java —
          @@ -0,0 +1,94 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.common.functions;
          +
          +import java.io.Serializable;
          +
          +/**
          + *
          + * <p>Aggregation functions must be

          {@link Serializable}

          because they are sent around
          + * between distributed processes during distributed execution.
          + *
          + * <p>An example how to use this interface is below:
          + *
          + * <pre>{@code
          + * // the accumulator, which holds the state of the in-flight aggregate
          + * public class AverageAccumulator

          { + * long count; + * long sum; + * }

          + *
          + * // implementation of an aggregation function for an 'average'
          + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator()

          { + * return new AverageAccumulator(); + * }
          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }
          + *
          + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * }
          + *
          + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * }
          + * }
          + *
          + * // implementation of a weighted average
          + * // this reuses the same accumulator type as the aggregate function for 'average'
          + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * }

          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b)

          { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }

          + *
          + * public void add(Datum value, AverageAccumulator acc)

          { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * }

          + *
          + * public Double getResult(AverageAccumulator acc)

          { + * return acc.sum / (double) acc.count; + * }

          + * }
          + * }</pre>
          + */
          +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
          +
          + ACC createAccumulator();
          +
          + void add(IN value, ACC accumulator);
          — End diff –

          As proposed in https://goo.gl/00ea5j, this function will not only handle the accumulate, but also handle the retract. Instead of "add", can you please consider to use "update".

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97226914 — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java — @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * <p>Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * <p>An example how to use this interface is below: + * + * <pre>{@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * }</pre> + */ +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); — End diff – As proposed in https://goo.gl/00ea5j , this function will not only handle the accumulate, but also handle the retract. Instead of "add", can you please consider to use "update".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3186#discussion_r97226958

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java —
          @@ -0,0 +1,94 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.common.functions;
          +
          +import java.io.Serializable;
          +
          +/**
          + *
          + * <p>Aggregation functions must be

          {@link Serializable}

          because they are sent around
          + * between distributed processes during distributed execution.
          + *
          + * <p>An example how to use this interface is below:
          + *
          + * <pre>{@code
          + * // the accumulator, which holds the state of the in-flight aggregate
          + * public class AverageAccumulator

          { + * long count; + * long sum; + * }

          + *
          + * // implementation of an aggregation function for an 'average'
          + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator()

          { + * return new AverageAccumulator(); + * }
          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }
          + *
          + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * }
          + *
          + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * }
          + * }
          + *
          + * // implementation of a weighted average
          + * // this reuses the same accumulator type as the aggregate function for 'average'
          + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * }

          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b)

          { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }

          + *
          + * public void add(Datum value, AverageAccumulator acc)

          { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * }

          + *
          + * public Double getResult(AverageAccumulator acc)

          { + * return acc.sum / (double) acc.count; + * }

          + * }
          + * }</pre>
          + */
          +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
          +
          + ACC createAccumulator();
          +
          + void add(IN value, ACC accumulator);
          +
          + OUT getResult(ACC accumulator);
          +
          + ACC merge(ACC a, ACC b);
          — End diff –

          Do you think it is useful to extend merge function to accept a list of ACC: ACC merge(List<ACC> a). There are cases where the group merging a list of instances is much more efficient than just merge only two instances.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97226958 — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java — @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * <p>Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * <p>An example how to use this interface is below: + * + * <pre>{@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * }</pre> + */ +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); + + OUT getResult(ACC accumulator); + + ACC merge(ACC a, ACC b); — End diff – Do you think it is useful to extend merge function to accept a list of ACC: ACC merge(List<ACC> a). There are cases where the group merging a list of instances is much more efficient than just merge only two instances.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3186

          Update: The first version had an issue with binary compatibility in the Scala DataStream API:

          This Scala API accidentally exposed in Flink 1.0 a public method with an internal type as a parameter (an internal utility method). That should not have been the case, since method cannot be guaranteed when a parameter is a non-public type.

          Unfortunately, the automatic tooling for binary compatibility checks is not flexible enough to work around that: Exclusions for methods do not work if parameter types were altered.

          Due to that, the newer version rolls back the cleanup commit that renames the internal `AggregateFunction`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 Update: The first version had an issue with binary compatibility in the Scala DataStream API: This Scala API accidentally exposed in Flink 1.0 a public method with an internal type as a parameter (an internal utility method). That should not have been the case, since method cannot be guaranteed when a parameter is a non-public type. Unfortunately, the automatic tooling for binary compatibility checks is not flexible enough to work around that: Exclusions for methods do not work if parameter types were altered. Due to that, the newer version rolls back the cleanup commit that renames the internal `AggregateFunction`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3186#discussion_r97227249

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java —
          @@ -0,0 +1,94 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.common.functions;
          +
          +import java.io.Serializable;
          +
          +/**
          + *
          + * <p>Aggregation functions must be

          {@link Serializable}

          because they are sent around
          + * between distributed processes during distributed execution.
          + *
          + * <p>An example how to use this interface is below:
          + *
          + * <pre>{@code
          + * // the accumulator, which holds the state of the in-flight aggregate
          + * public class AverageAccumulator

          { + * long count; + * long sum; + * }

          + *
          + * // implementation of an aggregation function for an 'average'
          + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator()

          { + * return new AverageAccumulator(); + * }
          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }
          + *
          + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * }
          + *
          + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * }
          + * }
          + *
          + * // implementation of a weighted average
          + * // this reuses the same accumulator type as the aggregate function for 'average'
          + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * }

          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b)

          { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }

          + *
          + * public void add(Datum value, AverageAccumulator acc)

          { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * }

          + *
          + * public Double getResult(AverageAccumulator acc)

          { + * return acc.sum / (double) acc.count; + * }

          + * }
          + * }</pre>
          + */
          +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
          +
          + ACC createAccumulator();
          +
          + void add(IN value, ACC accumulator);
          +
          + OUT getResult(ACC accumulator);
          +
          + ACC merge(ACC a, ACC b);
          — End diff –

          This could be done, true. It would currently mean a slight overhead (for list creation), but that is probably okay.

          I would like to address that change in a separate pull request: We should probably adjust the merging state implementation as well, to exploit that new signature.

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227249 — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java — @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * <p>Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * <p>An example how to use this interface is below: + * + * <pre>{@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * }</pre> + */ +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); + + OUT getResult(ACC accumulator); + + ACC merge(ACC a, ACC b); — End diff – This could be done, true. It would currently mean a slight overhead (for list creation), but that is probably okay. I would like to address that change in a separate pull request: We should probably adjust the merging state implementation as well, to exploit that new signature.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3186#discussion_r97227303

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java —
          @@ -0,0 +1,94 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.common.functions;
          +
          +import java.io.Serializable;
          +
          +/**
          + *
          + * <p>Aggregation functions must be

          {@link Serializable}

          because they are sent around
          + * between distributed processes during distributed execution.
          + *
          + * <p>An example how to use this interface is below:
          + *
          + * <pre>{@code
          + * // the accumulator, which holds the state of the in-flight aggregate
          + * public class AverageAccumulator

          { + * long count; + * long sum; + * }

          + *
          + * // implementation of an aggregation function for an 'average'
          + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator()

          { + * return new AverageAccumulator(); + * }
          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }
          + *
          + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * }
          + *
          + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * }
          + * }
          + *
          + * // implementation of a weighted average
          + * // this reuses the same accumulator type as the aggregate function for 'average'
          + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * }

          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b)

          { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }

          + *
          + * public void add(Datum value, AverageAccumulator acc)

          { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * }

          + *
          + * public Double getResult(AverageAccumulator acc)

          { + * return acc.sum / (double) acc.count; + * }

          + * }
          + * }</pre>
          + */
          +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
          +
          + ACC createAccumulator();
          +
          + void add(IN value, ACC accumulator);
          — End diff –

          Retractions are specific to the Table API. Do you expect this same interface to be used for user-defined aggregations there as well?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227303 — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java — @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * <p>Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * <p>An example how to use this interface is below: + * + * <pre>{@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * }</pre> + */ +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); — End diff – Retractions are specific to the Table API. Do you expect this same interface to be used for user-defined aggregations there as well?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3186#discussion_r97227451

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java —
          @@ -0,0 +1,94 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.common.functions;
          +
          +import java.io.Serializable;
          +
          +/**
          + *
          + * <p>Aggregation functions must be

          {@link Serializable}

          because they are sent around
          + * between distributed processes during distributed execution.
          + *
          + * <p>An example how to use this interface is below:
          + *
          + * <pre>{@code
          + * // the accumulator, which holds the state of the in-flight aggregate
          + * public class AverageAccumulator

          { + * long count; + * long sum; + * }

          + *
          + * // implementation of an aggregation function for an 'average'
          + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator()

          { + * return new AverageAccumulator(); + * }
          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }
          + *
          + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * }
          + *
          + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * }
          + * }
          + *
          + * // implementation of a weighted average
          + * // this reuses the same accumulator type as the aggregate function for 'average'
          + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * }

          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b)

          { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }

          + *
          + * public void add(Datum value, AverageAccumulator acc)

          { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * }

          + *
          + * public Double getResult(AverageAccumulator acc)

          { + * return acc.sum / (double) acc.count; + * }

          + * }
          + * }</pre>
          + */
          +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
          +
          + ACC createAccumulator();
          +
          + void add(IN value, ACC accumulator);
          — End diff –

          My first feeling is to keep the name `add()` because it fits better together with the term `Accumulator`. One can view retractions as adding negative values. What do you think about that?

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97227451 — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java — @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * <p>Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * <p>An example how to use this interface is below: + * + * <pre>{@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * }</pre> + */ +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); — End diff – My first feeling is to keep the name `add()` because it fits better together with the term `Accumulator`. One can view retractions as adding negative values. What do you think about that?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3186

          @shaoxuan-wang I cannot reproduce the compile error you posted.
          The latest commit also gets a green light from Travis CI: https://travis-ci.org/StephanEwen/incubator-flink/builds/194239397

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 @shaoxuan-wang I cannot reproduce the compile error you posted. The latest commit also gets a green light from Travis CI: https://travis-ci.org/StephanEwen/incubator-flink/builds/194239397
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3186

          Since this requires constantly extensive merge conflict resolving with the master, I want to merge this soon.

          @shaoxuan-wang has tested it life and the CI pass as well...

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3186 Since this requires constantly extensive merge conflict resolving with the master, I want to merge this soon. @shaoxuan-wang has tested it life and the CI pass as well...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3186

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3186
          Hide
          StephanEwen Stephan Ewen added a comment -

          Implemented in 09380e49256bff924734b9a932808e0f4daa7e5c

          Show
          StephanEwen Stephan Ewen added a comment - Implemented in 09380e49256bff924734b9a932808e0f4daa7e5c
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3186#discussion_r97246135

          — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java —
          @@ -0,0 +1,94 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +
          +package org.apache.flink.api.common.functions;
          +
          +import java.io.Serializable;
          +
          +/**
          + *
          + * <p>Aggregation functions must be

          {@link Serializable}

          because they are sent around
          + * between distributed processes during distributed execution.
          + *
          + * <p>An example how to use this interface is below:
          + *
          + * <pre>{@code
          + * // the accumulator, which holds the state of the in-flight aggregate
          + * public class AverageAccumulator

          { + * long count; + * long sum; + * }

          + *
          + * // implementation of an aggregation function for an 'average'
          + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator()

          { + * return new AverageAccumulator(); + * }
          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }
          + *
          + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * }
          + *
          + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * }
          + * }
          + *
          + * // implementation of a weighted average
          + * // this reuses the same accumulator type as the aggregate function for 'average'
          + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
          + *
          + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * }

          + *
          + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b)

          { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * }

          + *
          + * public void add(Datum value, AverageAccumulator acc)

          { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * }

          + *
          + * public Double getResult(AverageAccumulator acc)

          { + * return acc.sum / (double) acc.count; + * }

          + * }
          + * }</pre>
          + */
          +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
          +
          + ACC createAccumulator();
          +
          + void add(IN value, ACC accumulator);
          — End diff –

          TableAPI UDAGG will be eventually translated to this windowStream API. The accumulate and retract will be handled in this add function. I think it is OK if we "view retractions as adding negative values".

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3186#discussion_r97246135 — Diff: flink-core/src/main/java/org/apache/flink/api/common/functions/AggregateFunction.java — @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.functions; + +import java.io.Serializable; + +/** + * + * <p>Aggregation functions must be {@link Serializable} because they are sent around + * between distributed processes during distributed execution. + * + * <p>An example how to use this interface is below: + * + * <pre>{@code + * // the accumulator, which holds the state of the in-flight aggregate + * public class AverageAccumulator { + * long count; + * long sum; + * } + * + * // implementation of an aggregation function for an 'average' + * public class Average implements AggregateFunction<Integer, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Integer value, AverageAccumulator acc) { + * acc.sum += value; + * acc.count++; + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * + * // implementation of a weighted average + * // this reuses the same accumulator type as the aggregate function for 'average' + * public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> { + * + * public AverageAccumulator createAccumulator() { + * return new AverageAccumulator(); + * } + * + * public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) { + * a.count += b.count; + * a.sum += b.sum; + * return a; + * } + * + * public void add(Datum value, AverageAccumulator acc) { + * acc.count += value.getWeight(); + * acc.sum += value.getValue(); + * } + * + * public Double getResult(AverageAccumulator acc) { + * return acc.sum / (double) acc.count; + * } + * } + * }</pre> + */ +public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable { + + ACC createAccumulator(); + + void add(IN value, ACC accumulator); — End diff – TableAPI UDAGG will be eventually translated to this windowStream API. The accumulate and retract will be handled in this add function. I think it is OK if we "view retractions as adding negative values".

            People

            • Assignee:
              StephanEwen Stephan Ewen
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development