Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Retraction method is help for processing updated message. It will also very helpful for window Aggregation. This PR will first add retraction methods into the aggregateFunctions, such that on-going over window Aggregation can get benefit from it.

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with cd801aa5c5af0a5f1facd78c8464df9aef95f094

          Show
          fhueske Fabian Hueske added a comment - Implemented with cd801aa5c5af0a5f1facd78c8464df9aef95f094
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3470

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3470
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3470

          Thanks for the update @shaoxuan-wang.
          PR is good to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3470 Thanks for the update @shaoxuan-wang. PR is good to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104413472

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala —
          @@ -72,37 +72,34 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A

          a.f1 -= 1L
          — End diff –

          Good point

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104413472 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala — @@ -72,37 +72,34 @@ abstract class MaxWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends A a.f1 -= 1L — End diff – Good point
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104413666

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala —
          @@ -115,12 +112,28 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A
          }

          override def merge(accumulators: JList[Accumulator]): Accumulator = {

          • val ret = accumulators.get(0)
            + val ret = accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator[T]]
            var i: Int = 1
            while (i < accumulators.size()) {
            val a = accumulators.get.asInstanceOf[MaxWithRetractAccumulator[T]]
            if (a.f1 != 0) {
          • accumulate(ret.asInstanceOf[MaxWithRetractAccumulator[T]], a.f0)
            + val iterator = a.f2.keySet().iterator()
            + while (iterator.hasNext()) {
            + val key = iterator.next()
            + //updating the resulting max value if needed
            + if (ord.compare(ret.f0, key) < 0) {
              • End diff –

          Yes, it is not necessary to compare the max for all hash keys.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104413666 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala — @@ -115,12 +112,28 @@ abstract class MaxWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends A } override def merge(accumulators: JList [Accumulator] ): Accumulator = { val ret = accumulators.get(0) + val ret = accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator [T] ] var i: Int = 1 while (i < accumulators.size()) { val a = accumulators.get .asInstanceOf[MaxWithRetractAccumulator [T] ] if (a.f1 != 0) { accumulate(ret.asInstanceOf[MaxWithRetractAccumulator [T] ], a.f0) + val iterator = a.f2.keySet().iterator() + while (iterator.hasNext()) { + val key = iterator.next() + //updating the resulting max value if needed + if (ord.compare(ret.f0, key) < 0) { End diff – Yes, it is not necessary to compare the max for all hash keys.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104411239

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala —
          @@ -321,12 +317,11 @@ class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] {
          if (value != null) {
          val v = value.asInstanceOf[BigDecimal]
          val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
          + accum.f0 = accum.f0.subtract(v)
          + accum.f1 -= 1L
          if (accum.f1 == 0) {
          — End diff –

          This was intent to accommodate the "AggFunctionTestBase" for BigDecimal comparison (BigDecimal equals not only cares value but also the scale). I overlooked that we can use bigDecimal.compareTo to compare the values. Will remove this and change the test cases.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104411239 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala — @@ -321,12 +317,11 @@ class DecimalAvgAggFunction extends AggregateFunction [BigDecimal] { if (value != null) { val v = value.asInstanceOf [BigDecimal] val accum = accumulator.asInstanceOf [DecimalAvgAccumulator] + accum.f0 = accum.f0.subtract(v) + accum.f1 -= 1L if (accum.f1 == 0) { — End diff – This was intent to accommodate the "AggFunctionTestBase" for BigDecimal comparison (BigDecimal equals not only cares value but also the scale). I overlooked that we can use bigDecimal.compareTo to compare the values. Will remove this and change the test cases.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104376293

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala —
          @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] {
          }
          }

          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          — End diff –

          I see, then let's keep it as it is

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104376293 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala — @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction [T] extends AggregateFunction [T] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { — End diff – I see, then let's keep it as it is
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104392040

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala —
          @@ -115,12 +112,28 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A
          }

          override def merge(accumulators: JList[Accumulator]): Accumulator = {

          • val ret = accumulators.get(0)
            + val ret = accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator[T]]
            var i: Int = 1
            while (i < accumulators.size()) {
            val a = accumulators.get.asInstanceOf[MaxWithRetractAccumulator[T]]
            if (a.f1 != 0) {
          • accumulate(ret.asInstanceOf[MaxWithRetractAccumulator[T]], a.f0)
            + val iterator = a.f2.keySet().iterator()
            + while (iterator.hasNext()) {
            + val key = iterator.next()
            + //updating the resulting max value if needed
            + if (ord.compare(ret.f0, key) < 0) {
              • End diff –

          I think we can simply compare the max values of both accumulators (no need to compare all in the hash set) and merge both hash sets:
          ```
          // set max element
          if (ord.compare(ret.f0, key) < 0)

          { ret.f0 = a.f0; }

          // merge hash maps
          for (T key: a.f2.keySet()) {
          if (ret.f2.containsKey(key))

          { ret.f2.put(key, ret.f2.get(key) + count) }

          else

          { ret.f2.put(key, a.f2.get(key)) }

          }
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104392040 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala — @@ -115,12 +112,28 @@ abstract class MaxWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends A } override def merge(accumulators: JList [Accumulator] ): Accumulator = { val ret = accumulators.get(0) + val ret = accumulators.get(0).asInstanceOf[MaxWithRetractAccumulator [T] ] var i: Int = 1 while (i < accumulators.size()) { val a = accumulators.get .asInstanceOf[MaxWithRetractAccumulator [T] ] if (a.f1 != 0) { accumulate(ret.asInstanceOf[MaxWithRetractAccumulator [T] ], a.f0) + val iterator = a.f2.keySet().iterator() + while (iterator.hasNext()) { + val key = iterator.next() + //updating the resulting max value if needed + if (ord.compare(ret.f0, key) < 0) { End diff – I think we can simply compare the max values of both accumulators (no need to compare all in the hash set) and merge both hash sets: ``` // set max element if (ord.compare(ret.f0, key) < 0) { ret.f0 = a.f0; } // merge hash maps for (T key: a.f2.keySet()) { if (ret.f2.containsKey(key)) { ret.f2.put(key, ret.f2.get(key) + count) } else { ret.f2.put(key, a.f2.get(key)) } } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104392189

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala —
          @@ -72,37 +72,34 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A

          a.f1 -= 1L

          • if (!a.f2.containsKey(v)) { - throw TableException("unexpected retract message") - }

            else {

          • var count = a.f2.get(v)
          • count -= 1L
          • if (count == 0) {
          • //remove the key v from the map if the number of appearance of the value v is 0
          • a.f2.remove(v)
          • //if the total count is 0, we could just simply set the f0(min) to the initial value
          • if (a.f1 == 0) { - a.f0 = getInitValue - return - }
          • //if v is the current min value, we have to iterate the map to find the 2nd smallest
          • // value to replace v as the min value
          • if (v == a.f0) {
          • val iterator = a.f2.keySet().iterator()
          • var key = iterator.next()
          • a.f0 = key
          • while (iterator.hasNext()) {
          • key = iterator.next()
          • if (ord.compare(a.f0, key) > 0) { - a.f0 = key - }

            + var count = a.f2.get(v)
            + count -= 1L
            + if (count == 0) {
            + //remove the key v from the map if the number of appearance of the value v is 0
            + a.f2.remove(v)
            + //if the total count is 0, we could just simply set the f0(min) to the initial value
            + if (a.f1 == 0) {

              • End diff –

          remove `a.f1` from accumulator?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104392189 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala — @@ -72,37 +72,34 @@ abstract class MinWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends A a.f1 -= 1L if (!a.f2.containsKey(v)) { - throw TableException("unexpected retract message") - } else { var count = a.f2.get(v) count -= 1L if (count == 0) { //remove the key v from the map if the number of appearance of the value v is 0 a.f2.remove(v) //if the total count is 0, we could just simply set the f0(min) to the initial value if (a.f1 == 0) { - a.f0 = getInitValue - return - } //if v is the current min value, we have to iterate the map to find the 2nd smallest // value to replace v as the min value if (v == a.f0) { val iterator = a.f2.keySet().iterator() var key = iterator.next() a.f0 = key while (iterator.hasNext()) { key = iterator.next() if (ord.compare(a.f0, key) > 0) { - a.f0 = key - } + var count = a.f2.get(v) + count -= 1L + if (count == 0) { + //remove the key v from the map if the number of appearance of the value v is 0 + a.f2.remove(v) + //if the total count is 0, we could just simply set the f0(min) to the initial value + if (a.f1 == 0) { End diff – remove `a.f1` from accumulator?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104389792

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala —
          @@ -321,12 +317,11 @@ class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] {
          if (value != null) {
          val v = value.asInstanceOf[BigDecimal]
          val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
          + accum.f0 = accum.f0.subtract(v)
          + accum.f1 -= 1L
          if (accum.f1 == 0) {
          — End diff –

          Do we need this check?
          It only makes sense if we assume that accumulated and retracted values differ. But if that's the case all values for `cnt != 0` would be considered wrong.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104389792 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala — @@ -321,12 +317,11 @@ class DecimalAvgAggFunction extends AggregateFunction [BigDecimal] { if (value != null) { val v = value.asInstanceOf [BigDecimal] val accum = accumulator.asInstanceOf [DecimalAvgAccumulator] + accum.f0 = accum.f0.subtract(v) + accum.f1 -= 1L if (accum.f1 == 0) { — End diff – Do we need this check? It only makes sense if we assume that accumulated and retracted values differ. But if that's the case all values for `cnt != 0` would be considered wrong.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104392251

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala —
          @@ -115,12 +112,28 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A
          }

          override def merge(accumulators: JList[Accumulator]): Accumulator = {

          • val ret = accumulators.get(0)
            + val ret = accumulators.get(0).asInstanceOf[MinWithRetractAccumulator[T]]
            var i: Int = 1
            while (i < accumulators.size()) {
            val a = accumulators.get.asInstanceOf[MinWithRetractAccumulator[T]]
            if (a.f1 != 0) {
          • accumulate(ret.asInstanceOf[MinWithRetractAccumulator[T]], a.f0)
            + val iterator = a.f2.keySet().iterator()
            + while (iterator.hasNext()) {
            + val key = iterator.next()
            + //updating the resulting max value if needed
            + if (ord.compare(ret.f0, key) > 0) {
              • End diff –

          As for MAX

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104392251 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala — @@ -115,12 +112,28 @@ abstract class MinWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends A } override def merge(accumulators: JList [Accumulator] ): Accumulator = { val ret = accumulators.get(0) + val ret = accumulators.get(0).asInstanceOf[MinWithRetractAccumulator [T] ] var i: Int = 1 while (i < accumulators.size()) { val a = accumulators.get .asInstanceOf[MinWithRetractAccumulator [T] ] if (a.f1 != 0) { accumulate(ret.asInstanceOf[MinWithRetractAccumulator [T] ], a.f0) + val iterator = a.f2.keySet().iterator() + while (iterator.hasNext()) { + val key = iterator.next() + //updating the resulting max value if needed + if (ord.compare(ret.f0, key) > 0) { End diff – As for MAX
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104394062

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala —
          @@ -0,0 +1,201 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.functions.aggfunctions
          +
          +import java.math.BigDecimal
          +import java.util.

          {List => JList}

          +
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.tuple.

          {Tuple2 => JTuple2}

          +import org.apache.flink.api.java.typeutils.TupleTypeInfo
          +import org.apache.flink.table.api.TableException
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +
          +/** The initial accumulator for Sum with retract aggregate function */
          +class SumWithRetractAccumulator[T] extends JTuple2[T, Long] with Accumulator
          +
          +/**
          + * Base class for built-in Sum with retract aggregate function
          + *
          + * @tparam T the type for the aggregation result
          + */
          +abstract class SumWithRetractAggFunction[T: Numeric] extends AggregateFunction[T] {
          +
          + private val numeric = implicitly[Numeric[T]]
          +
          + override def createAccumulator(): Accumulator =

          { + val acc = new SumWithRetractAccumulator[T]() + acc.f0 = numeric.zero //sum + acc.f1 = 0L //total count + acc + }

          +
          + override def accumulate(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null)

          { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] + a.f0 = numeric.plus(a.f0, v) + a.f1 += 1 + }

          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null)

          { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] + a.f0 = numeric.minus(a.f0, v) + a.f1 -= 1 + }

          + }
          +
          + override def getValue(accumulator: Accumulator): T = {
          + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]]
          + if (a.f1 > 0)

          { + a.f0 + }

          else

          { + null.asInstanceOf[T] + }

          + }
          +
          + override def merge(accumulators: JList[Accumulator]): Accumulator = {
          + val ret = createAccumulator().asInstanceOf[SumWithRetractAccumulator[T]]
          + var i: Int = 0
          + while (i < accumulators.size())

          { + val a = accumulators.get(i).asInstanceOf[SumWithRetractAccumulator[T]] + ret.f0 = numeric.plus(ret.f0, a.f0) + ret.f1 += a.f1 + i += 1 + }

          + ret
          + }
          +
          + override def getAccumulatorType(): TypeInformation[_] =

          { + new TupleTypeInfo( + (new SumWithRetractAccumulator).getClass, + getValueTypeInfo, + BasicTypeInfo.LONG_TYPE_INFO) + }

          +
          + def getValueTypeInfo: TypeInformation[_]
          +}
          +
          +/**
          + * Built-in Byte Sum with retract aggregate function
          + */
          +class ByteSumWithRetractAggFunction extends SumWithRetractAggFunction[Byte]

          { + override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO +}

          +
          +/**
          + * Built-in Short Sum with retract aggregate function
          + */
          +class ShortSumWithRetractAggFunction extends SumWithRetractAggFunction[Short]

          { + override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO +}

          +
          +/**
          + * Built-in Int Sum with retract aggregate function
          + */
          +class IntSumWithRetractAggFunction extends SumWithRetractAggFunction[Int]

          { + override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO +}

          +
          +/**
          + * Built-in Long Sum with retract aggregate function
          + */
          +class LongSumWithRetractAggFunction extends SumWithRetractAggFunction[Long]

          { + override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO +}

          +
          +/**
          + * Built-in Float Sum with retract aggregate function
          + */
          +class FloatSumWithRetractAggFunction extends SumWithRetractAggFunction[Float]

          { + override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO +}

          +
          +/**
          + * Built-in Double Sum with retract aggregate function
          + */
          +class DoubleSumWithRetractAggFunction extends SumWithRetractAggFunction[Double]

          { + override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO +}

          +
          +/** The initial accumulator for Big Decimal Sum with retract aggregate function */
          +class DecimalSumWithRetractAccumulator extends JTuple2[BigDecimal, Long] with Accumulator

          { + f0 = BigDecimal.ZERO + f1 = 0L +}

          +
          +/**
          + * Built-in Big Decimal Sum with retract aggregate function
          + */
          +class DecimalSumWithRetractAggFunction extends AggregateFunction[BigDecimal] {
          +
          + override def createAccumulator(): Accumulator =

          { + new DecimalSumWithRetractAccumulator + }

          +
          + override def accumulate(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null)

          { + val v = value.asInstanceOf[BigDecimal] + val accum = accumulator.asInstanceOf[DecimalSumWithRetractAccumulator] + accum.f0 = accum.f0.add(v) + accum.f1 += 1L + }

          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[BigDecimal]
          + val accum = accumulator.asInstanceOf[DecimalSumWithRetractAccumulator]
          + accum.f0 = accum.f0.subtract(v)
          + accum.f1 -= 1L
          + if (accum.f1 == 0) {
          — End diff –

          Do we need this check?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104394062 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumWithRetractAggFunction.scala — @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util. {List => JList} + +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple. {Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} + +/** The initial accumulator for Sum with retract aggregate function */ +class SumWithRetractAccumulator [T] extends JTuple2 [T, Long] with Accumulator + +/** + * Base class for built-in Sum with retract aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class SumWithRetractAggFunction [T: Numeric] extends AggregateFunction [T] { + + private val numeric = implicitly[Numeric [T] ] + + override def createAccumulator(): Accumulator = { + val acc = new SumWithRetractAccumulator[T]() + acc.f0 = numeric.zero //sum + acc.f1 = 0L //total count + acc + } + + override def accumulate(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] + a.f0 = numeric.plus(a.f0, v) + a.f1 += 1 + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf[T] + val a = accumulator.asInstanceOf[SumWithRetractAccumulator[T]] + a.f0 = numeric.minus(a.f0, v) + a.f1 -= 1 + } + } + + override def getValue(accumulator: Accumulator): T = { + val a = accumulator.asInstanceOf[SumWithRetractAccumulator [T] ] + if (a.f1 > 0) { + a.f0 + } else { + null.asInstanceOf[T] + } + } + + override def merge(accumulators: JList [Accumulator] ): Accumulator = { + val ret = createAccumulator().asInstanceOf[SumWithRetractAccumulator [T] ] + var i: Int = 0 + while (i < accumulators.size()) { + val a = accumulators.get(i).asInstanceOf[SumWithRetractAccumulator[T]] + ret.f0 = numeric.plus(ret.f0, a.f0) + ret.f1 += a.f1 + i += 1 + } + ret + } + + override def getAccumulatorType(): TypeInformation [_] = { + new TupleTypeInfo( + (new SumWithRetractAccumulator).getClass, + getValueTypeInfo, + BasicTypeInfo.LONG_TYPE_INFO) + } + + def getValueTypeInfo: TypeInformation [_] +} + +/** + * Built-in Byte Sum with retract aggregate function + */ +class ByteSumWithRetractAggFunction extends SumWithRetractAggFunction [Byte] { + override def getValueTypeInfo = BasicTypeInfo.BYTE_TYPE_INFO +} + +/** + * Built-in Short Sum with retract aggregate function + */ +class ShortSumWithRetractAggFunction extends SumWithRetractAggFunction [Short] { + override def getValueTypeInfo = BasicTypeInfo.SHORT_TYPE_INFO +} + +/** + * Built-in Int Sum with retract aggregate function + */ +class IntSumWithRetractAggFunction extends SumWithRetractAggFunction [Int] { + override def getValueTypeInfo = BasicTypeInfo.INT_TYPE_INFO +} + +/** + * Built-in Long Sum with retract aggregate function + */ +class LongSumWithRetractAggFunction extends SumWithRetractAggFunction [Long] { + override def getValueTypeInfo = BasicTypeInfo.LONG_TYPE_INFO +} + +/** + * Built-in Float Sum with retract aggregate function + */ +class FloatSumWithRetractAggFunction extends SumWithRetractAggFunction [Float] { + override def getValueTypeInfo = BasicTypeInfo.FLOAT_TYPE_INFO +} + +/** + * Built-in Double Sum with retract aggregate function + */ +class DoubleSumWithRetractAggFunction extends SumWithRetractAggFunction [Double] { + override def getValueTypeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO +} + +/** The initial accumulator for Big Decimal Sum with retract aggregate function */ +class DecimalSumWithRetractAccumulator extends JTuple2 [BigDecimal, Long] with Accumulator { + f0 = BigDecimal.ZERO + f1 = 0L +} + +/** + * Built-in Big Decimal Sum with retract aggregate function + */ +class DecimalSumWithRetractAggFunction extends AggregateFunction [BigDecimal] { + + override def createAccumulator(): Accumulator = { + new DecimalSumWithRetractAccumulator + } + + override def accumulate(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf[BigDecimal] + val accum = accumulator.asInstanceOf[DecimalSumWithRetractAccumulator] + accum.f0 = accum.f0.add(v) + accum.f1 += 1L + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [BigDecimal] + val accum = accumulator.asInstanceOf [DecimalSumWithRetractAccumulator] + accum.f0 = accum.f0.subtract(v) + accum.f1 -= 1L + if (accum.f1 == 0) { — End diff – Do we need this check?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104390901

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala —
          @@ -72,37 +72,34 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends A

          a.f1 -= 1L
          — End diff –

          I think we don't need the `f1: Long` field (total count). We can also check `f2.size() == 0`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104390901 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala — @@ -72,37 +72,34 @@ abstract class MaxWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends A a.f1 -= 1L — End diff – I think we don't need the `f1: Long` field (total count). We can also check `f2.size() == 0`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3470

          @fhueske Thanks for the review. I have addressed all your comments and updated the PR

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3470 @fhueske Thanks for the review. I have addressed all your comments and updated the PR
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104304129

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala —
          @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] {
          }
          }

          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          — End diff –

          If I directly create a LongAvgAggFunction, it will be difficult to return null for avg result, as null.asInstanceOf[Long] will return 0.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104304129 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala — @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction [T] extends AggregateFunction [T] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { — End diff – If I directly create a LongAvgAggFunction, it will be difficult to return null for avg result, as null.asInstanceOf [Long] will return 0.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104304057

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala —
          @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
          def createAccumulator(): Accumulator

          /**
          + * Retract the input values from the accumulator instance.
          + *
          + * @param accumulator the accumulator which contains the current
          + * aggregated results
          + * @param input the input value (usually obtained from a new arrived data)
          — End diff –

          Ah, OK. but I think the above comments should be clear enough: "Retract the input values from the accumulator instance" I think we can leave it as this because it makes accumulate and retract interface consistent and clean. We will have (user defined) multiple inputs for both accumulate and retract in the near future, at which point we will anyway not control the naming of this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104304057 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala — @@ -35,6 +36,18 @@ abstract class AggregateFunction [T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** + * Retract the input values from the accumulator instance. + * + * @param accumulator the accumulator which contains the current + * aggregated results + * @param input the input value (usually obtained from a new arrived data) — End diff – Ah, OK. but I think the above comments should be clear enough: "Retract the input values from the accumulator instance" I think we can leave it as this because it makes accumulate and retract interface consistent and clean. We will have (user defined) multiple inputs for both accumulate and retract in the near future, at which point we will anyway not control the naming of this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104303999

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala —
          @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
          def createAccumulator(): Accumulator

          /**
          + * Retract the input values from the accumulator instance.
          — End diff –

          I was thinking further for the dataStream retraction where the source table could send out of date retraction message. But let us add this assumption for now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104303999 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala — @@ -35,6 +36,18 @@ abstract class AggregateFunction [T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** + * Retract the input values from the accumulator instance. — End diff – I was thinking further for the dataStream retraction where the source table could send out of date retraction message. But let us add this assumption for now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104303912

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T]

          { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + }

          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[T]
          + val a = accumulator.asInstanceOf[SumAccumulator[T]]
          + a.f0 = numeric.plus(v, a.f0)
          — End diff –

          Added the test to compare two accumulators instead of just comparing two values will catch this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104303912 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -49,13 +50,25 @@ abstract class SumAggFunction [T: Numeric] extends AggregateFunction [T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [T] + val a = accumulator.asInstanceOf[SumAccumulator [T] ] + a.f0 = numeric.plus(v, a.f0) — End diff – Added the test to compare two accumulators instead of just comparing two values will catch this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104302135

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala —
          @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] {

          def aggregator: AggregateFunction[T]

          + def ifSupportRetraction: Boolean = true
          +
          @Test

          • // test aggregate functions without partial merge
          • def testAggregateWithoutMerge(): Unit = {
            + // test aggregate and retract functions without partial merge
            + def testAccumulateAndRetractWithoutMerge(): Unit = {
            // iterate over input sets
            for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
          • val accumulator = aggregateVals(vals)
          • val result = aggregator.getValue(accumulator)
            + val accumulator = accumulateVals(vals)
            + var result = aggregator.getValue(accumulator)
            validateResult(expected, result)
            +
            + if (ifSupportRetraction) {
            + retractVals(accumulator, vals)
              • End diff –

          Regarding to "We also need to check the retraction of a single value not all values (the SumAggregator retraction of all values was correct because the count was 0)"
          Instead of adding a new test to just retraction one, I think we could just add the check to compare two accumulators (comparing the getValues is not sufficient enough anymore).
          assertEquals(accumulator, resultAccum)
          The accumulator should go back the initial status, after retracting all the previous inputs. This should be good enough to valid accumulate and retraction.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104302135 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala — @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase [T] { def aggregator: AggregateFunction [T] + def ifSupportRetraction: Boolean = true + @Test // test aggregate functions without partial merge def testAggregateWithoutMerge(): Unit = { + // test aggregate and retract functions without partial merge + def testAccumulateAndRetractWithoutMerge(): Unit = { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { val accumulator = aggregateVals(vals) val result = aggregator.getValue(accumulator) + val accumulator = accumulateVals(vals) + var result = aggregator.getValue(accumulator) validateResult(expected, result) + + if (ifSupportRetraction) { + retractVals(accumulator, vals) End diff – Regarding to "We also need to check the retraction of a single value not all values (the SumAggregator retraction of all values was correct because the count was 0)" Instead of adding a new test to just retraction one, I think we could just add the check to compare two accumulators (comparing the getValues is not sufficient enough anymore). assertEquals(accumulator, resultAccum) The accumulator should go back the initial status, after retracting all the previous inputs. This should be good enough to valid accumulate and retraction.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104301347

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala —
          @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] {

          def aggregator: AggregateFunction[T]

          + def ifSupportRetraction: Boolean = true
          +
          @Test

          • // test aggregate functions without partial merge
          • def testAggregateWithoutMerge(): Unit = {
            + // test aggregate and retract functions without partial merge
            + def testAccumulateAndRetractWithoutMerge(): Unit = {
            // iterate over input sets
            for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
          • val accumulator = aggregateVals(vals)
          • val result = aggregator.getValue(accumulator)
            + val accumulator = accumulateVals(vals)
            + var result = aggregator.getValue(accumulator)
            validateResult(expected, result)
            +
            + if (ifSupportRetraction) {
            + retractVals(accumulator, vals)
              • End diff –

          Sounds good to me. I prefer to apply merge with retraction, will add the tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301347 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala — @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase [T] { def aggregator: AggregateFunction [T] + def ifSupportRetraction: Boolean = true + @Test // test aggregate functions without partial merge def testAggregateWithoutMerge(): Unit = { + // test aggregate and retract functions without partial merge + def testAccumulateAndRetractWithoutMerge(): Unit = { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { val accumulator = aggregateVals(vals) val result = aggregator.getValue(accumulator) + val accumulator = accumulateVals(vals) + var result = aggregator.getValue(accumulator) validateResult(expected, result) + + if (ifSupportRetraction) { + retractVals(accumulator, vals) End diff – Sounds good to me. I prefer to apply merge with retraction, will add the tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104301312

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T]

          { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + }

          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[T]
          + val a = accumulator.asInstanceOf[SumAccumulator[T]]
          + a.f0 = numeric.plus(v, a.f0)
          + a.f1 -= 1
          + if (a.f1 < 0) {
          — End diff –

          This exception usually won't happen if we use the retract for bounded over windows. With the on-going dataStream retraction design, the source table can be a table with PrimaryKey, which can generate retraction message. The downstream streaming job may receive the garbage retraction message, as the logging of the soureTable could contain the out of date retractions. The initial intent to throw the exception here is actually a mark for the future retraction design. Let me add a comment here (or may be just noted down myself) and remove the checks for now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301312 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -49,13 +50,25 @@ abstract class SumAggFunction [T: Numeric] extends AggregateFunction [T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [T] + val a = accumulator.asInstanceOf[SumAccumulator [T] ] + a.f0 = numeric.plus(v, a.f0) + a.f1 -= 1 + if (a.f1 < 0) { — End diff – This exception usually won't happen if we use the retract for bounded over windows. With the on-going dataStream retraction design, the source table can be a table with PrimaryKey, which can generate retraction message. The downstream streaming job may receive the garbage retraction message, as the logging of the soureTable could contain the out of date retractions. The initial intent to throw the exception here is actually a mark for the future retraction design. Let me add a comment here (or may be just noted down myself) and remove the checks for now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104301172

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -23,10 +23,11 @@ import java.util.

          {List => JList}

          import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          import org.apache.flink.api.java.tuple.

          {Tuple2 => JTuple2}

          import org.apache.flink.api.java.typeutils.TupleTypeInfo
          +import org.apache.flink.table.api.TableException
          import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          /** The initial accumulator for Sum aggregate function */
          -class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator
          +class SumAccumulator[T] extends JTuple2[T, Long] with Accumulator
          — End diff –

          I am Ok with having two Sum aggregates.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301172 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -23,10 +23,11 @@ import java.util. {List => JList} import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple. {Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.TableException import org.apache.flink.table.functions. {Accumulator, AggregateFunction} /** The initial accumulator for Sum aggregate function */ -class SumAccumulator [T] extends JTuple2 [T, Boolean] with Accumulator +class SumAccumulator [T] extends JTuple2 [T, Long] with Accumulator — End diff – I am Ok with having two Sum aggregates.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104301147

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala —
          @@ -0,0 +1,205 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.functions.aggfunctions
          +
          +import java.math.BigDecimal
          +import java.util.

          {HashMap => JHashMap, List => JList}

          +
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.tuple.

          {Tuple3 => JTuple3}

          +import org.apache.flink.api.java.typeutils.

          {MapTypeInfo, TupleTypeInfo}

          +import org.apache.flink.table.api.TableException
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +
          +/** The initial accumulator for Max with retraction aggregate function */
          +class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, Long]] with Accumulator
          +
          +/**
          + * Base class for built-in Max with retraction aggregate function
          + *
          + * @tparam T the type for the aggregation result
          + */
          +abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
          +
          + override def createAccumulator(): Accumulator =

          { + val acc = new MaxWithRetractAccumulator[T] + acc.f0 = getInitValue //max + acc.f1 = 0L //total count + acc.f2 = new JHashMap[T, Long]() //store the count for each value + acc + }

          +
          + override def accumulate(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[T]
          + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
          +
          + if (a.f1 == 0 || (ord.compare(a.f0, v) < 0))

          { + a.f0 = v + }

          +
          + a.f1 += 1L
          +
          + if (!a.f2.containsKey(v))

          { + a.f2.put(v, 1L) + }

          else

          { + var count = a.f2.get(v) + count += 1L + a.f2.put(v, count) + }

          + }
          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[T]
          + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
          +
          + a.f1 -= 1L
          +
          + if (!a.f2.containsKey(v))

          { + throw TableException("unexpected retract message") + }

          else {
          + var count = a.f2.get(v)
          + count -= 1L
          + if (count == 0) {
          + //remove the key v from the map if the number of appearance of the value v is 0
          + a.f2.remove(v)
          + //if the total count is 0, we could just simply set the f0(max) to the initial value
          + if (a.f1 == 0)

          { + a.f0 = getInitValue + return + }

          + //if v is the current max value, we have to iterate the map to find the 2nd biggest
          + // value to replace v as the max value
          + if (v == a.f0) {
          + val iterator = a.f2.keySet().iterator()
          + var key = iterator.next()
          + a.f0 = key
          + while (iterator.hasNext()) {
          + key = iterator.next()
          + if (ord.compare(a.f0, key) < 0)

          { + a.f0 = key + }

          + }
          + }
          + } else

          { + a.f2.put(v, count) + }

          + }
          + }
          + }
          +
          + override def getValue(accumulator: Accumulator): T = {
          + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
          + if (a.f1 != 0)

          { + a.f0 + }

          else

          { + null.asInstanceOf[T] + }

          + }
          +
          + override def merge(accumulators: JList[Accumulator]): Accumulator = {
          + val ret = accumulators.get(0)
          + var i: Int = 1
          + while (i < accumulators.size()) {
          + val a = accumulators.get.asInstanceOf[MaxWithRetractAccumulator[T]]
          + if (a.f1 != 0) {
          + accumulate(ret.asInstanceOf[MaxWithRetractAccumulator[T]], a.f0)
          — End diff –

          Ah, good catch! We should merge the hashmap as well, retraction is an important feature for dataStrream. We might want to always turn the retraction mechanism on. So merge the hashmap is a much-have, as session window and future dataStream local-global agg will leverage merge method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301147 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala — @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util. {HashMap => JHashMap, List => JList} + +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple. {Tuple3 => JTuple3} +import org.apache.flink.api.java.typeutils. {MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} + +/** The initial accumulator for Max with retraction aggregate function */ +class MaxWithRetractAccumulator [T] extends JTuple3[T, Long, JHashMap [T, Long] ] with Accumulator + +/** + * Base class for built-in Max with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class MaxWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends AggregateFunction [T] { + + override def createAccumulator(): Accumulator = { + val acc = new MaxWithRetractAccumulator[T] + acc.f0 = getInitValue //max + acc.f1 = 0L //total count + acc.f2 = new JHashMap[T, Long]() //store the count for each value + acc + } + + override def accumulate(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [T] + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator [T] ] + + if (a.f1 == 0 || (ord.compare(a.f0, v) < 0)) { + a.f0 = v + } + + a.f1 += 1L + + if (!a.f2.containsKey(v)) { + a.f2.put(v, 1L) + } else { + var count = a.f2.get(v) + count += 1L + a.f2.put(v, count) + } + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [T] + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator [T] ] + + a.f1 -= 1L + + if (!a.f2.containsKey(v)) { + throw TableException("unexpected retract message") + } else { + var count = a.f2.get(v) + count -= 1L + if (count == 0) { + //remove the key v from the map if the number of appearance of the value v is 0 + a.f2.remove(v) + //if the total count is 0, we could just simply set the f0(max) to the initial value + if (a.f1 == 0) { + a.f0 = getInitValue + return + } + //if v is the current max value, we have to iterate the map to find the 2nd biggest + // value to replace v as the max value + if (v == a.f0) { + val iterator = a.f2.keySet().iterator() + var key = iterator.next() + a.f0 = key + while (iterator.hasNext()) { + key = iterator.next() + if (ord.compare(a.f0, key) < 0) { + a.f0 = key + } + } + } + } else { + a.f2.put(v, count) + } + } + } + } + + override def getValue(accumulator: Accumulator): T = { + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator [T] ] + if (a.f1 != 0) { + a.f0 + } else { + null.asInstanceOf[T] + } + } + + override def merge(accumulators: JList [Accumulator] ): Accumulator = { + val ret = accumulators.get(0) + var i: Int = 1 + while (i < accumulators.size()) { + val a = accumulators.get .asInstanceOf[MaxWithRetractAccumulator [T] ] + if (a.f1 != 0) { + accumulate(ret.asInstanceOf[MaxWithRetractAccumulator [T] ], a.f0) — End diff – Ah, good catch! We should merge the hashmap as well, retraction is an important feature for dataStrream. We might want to always turn the retraction mechanism on. So merge the hashmap is a much-have, as session window and future dataStream local-global agg will leverage merge method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104301102

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala —
          @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
          def createAccumulator(): Accumulator

          /**
          + * Retract the input values from the accumulator instance.
          + *
          + * @param accumulator the accumulator which contains the current
          + * aggregated results
          + * @param input the input value (usually obtained from a new arrived data)
          — End diff –

          I was referring to `input` not `accumulator`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301102 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala — @@ -35,6 +36,18 @@ abstract class AggregateFunction [T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** + * Retract the input values from the accumulator instance. + * + * @param accumulator the accumulator which contains the current + * aggregated results + * @param input the input value (usually obtained from a new arrived data) — End diff – I was referring to `input` not `accumulator`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104301092

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala —
          @@ -0,0 +1,205 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.functions.aggfunctions
          +
          +import java.math.BigDecimal
          +import java.util.

          {HashMap => JHashMap, List => JList}

          +
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.tuple.

          {Tuple3 => JTuple3}

          +import org.apache.flink.api.java.typeutils.

          {MapTypeInfo, TupleTypeInfo}

          +import org.apache.flink.table.api.TableException
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +
          +/** The initial accumulator for Max with retraction aggregate function */
          +class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, Long]] with Accumulator
          +
          +/**
          + * Base class for built-in Max with retraction aggregate function
          + *
          + * @tparam T the type for the aggregation result
          + */
          +abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
          +
          + override def createAccumulator(): Accumulator = {
          + val acc = new MaxWithRetractAccumulator[T]
          + acc.f0 = getInitValue //max
          + acc.f1 = 0L //total count
          + acc.f2 = new JHashMap[T, Long]() //store the count for each value
          — End diff –

          yes, the idea is borrowed from bucket sort, I think this should be the right dataStructure for max, min, and even medium.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301092 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala — @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util. {HashMap => JHashMap, List => JList} + +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple. {Tuple3 => JTuple3} +import org.apache.flink.api.java.typeutils. {MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} + +/** The initial accumulator for Max with retraction aggregate function */ +class MaxWithRetractAccumulator [T] extends JTuple3[T, Long, JHashMap [T, Long] ] with Accumulator + +/** + * Base class for built-in Max with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class MaxWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends AggregateFunction [T] { + + override def createAccumulator(): Accumulator = { + val acc = new MaxWithRetractAccumulator [T] + acc.f0 = getInitValue //max + acc.f1 = 0L //total count + acc.f2 = new JHashMap [T, Long] () //store the count for each value — End diff – yes, the idea is borrowed from bucket sort, I think this should be the right dataStructure for max, min, and even medium.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104301022

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala —
          @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] {
          }
          }

          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          — End diff –

          I assume you are suggesting implement LongAvgAggFunction directly with JTuple2[BigInteger, Long] as the accumulator, then yes, we could do that.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104301022 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala — @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction [T] extends AggregateFunction [T] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { — End diff – I assume you are suggesting implement LongAvgAggFunction directly with JTuple2 [BigInteger, Long] as the accumulator, then yes, we could do that.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104300989

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala —
          @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
          def createAccumulator(): Accumulator

          /**
          + * Retract the input values from the accumulator instance.
          + *
          + * @param accumulator the accumulator which contains the current
          + * aggregated results
          + * @param input the input value (usually obtained from a new arrived data)
          — End diff –

          I think it should be ok to use "accumulator" here. Retract is a method, it retracts the value from the accumulator.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104300989 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala — @@ -35,6 +36,18 @@ abstract class AggregateFunction [T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** + * Retract the input values from the accumulator instance. + * + * @param accumulator the accumulator which contains the current + * aggregated results + * @param input the input value (usually obtained from a new arrived data) — End diff – I think it should be ok to use "accumulator" here. Retract is a method, it retracts the value from the accumulator.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298377

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -148,12 +161,24 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal]

          { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] accum.f0 = accum.f0.add(v) - accum.f1 = true + accum.f1 += 1L + }

          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[BigDecimal]
          + val accum = accumulator.asInstanceOf[DecimalSumAccumulator]
          + accum.f0 = accum.f0.add(v)
          — End diff –

          subtract

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298377 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -148,12 +161,24 @@ class DecimalSumAggFunction extends AggregateFunction [BigDecimal] { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] accum.f0 = accum.f0.add(v) - accum.f1 = true + accum.f1 += 1L + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [BigDecimal] + val accum = accumulator.asInstanceOf [DecimalSumAccumulator] + accum.f0 = accum.f0.add(v) — End diff – subtract
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298146

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -23,10 +23,11 @@ import java.util.

          {List => JList}

          import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          import org.apache.flink.api.java.tuple.

          {Tuple2 => JTuple2}

          import org.apache.flink.api.java.typeutils.TupleTypeInfo
          +import org.apache.flink.table.api.TableException
          import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          /** The initial accumulator for Sum aggregate function */
          -class SumAccumulator[T] extends JTuple2[T, Boolean] with Accumulator
          +class SumAccumulator[T] extends JTuple2[T, Long] with Accumulator
          — End diff –

          I think we should also offer `SumAccumulator` with and without retraction. The difference between `Long` and `Boolean` is 7 bytes, which is a lot of additional overhead for most types.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298146 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -23,10 +23,11 @@ import java.util. {List => JList} import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.tuple. {Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.table.api.TableException import org.apache.flink.table.functions. {Accumulator, AggregateFunction} /** The initial accumulator for Sum aggregate function */ -class SumAccumulator [T] extends JTuple2 [T, Boolean] with Accumulator +class SumAccumulator [T] extends JTuple2 [T, Long] with Accumulator — End diff – I think we should also offer `SumAccumulator` with and without retraction. The difference between `Long` and `Boolean` is 7 bytes, which is a lot of additional overhead for most types.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298179

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T]

          { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + }

          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[T]
          + val a = accumulator.asInstanceOf[SumAccumulator[T]]
          + a.f0 = numeric.plus(v, a.f0)
          — End diff –

          minus

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298179 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -49,13 +50,25 @@ abstract class SumAggFunction [T: Numeric] extends AggregateFunction [T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [T] + val a = accumulator.asInstanceOf[SumAccumulator [T] ] + a.f0 = numeric.plus(v, a.f0) — End diff – minus
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298390

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -148,12 +161,24 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal]

          { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] accum.f0 = accum.f0.add(v) - accum.f1 = true + accum.f1 += 1L + }

          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[BigDecimal]
          + val accum = accumulator.asInstanceOf[DecimalSumAccumulator]
          + accum.f0 = accum.f0.add(v)
          + accum.f1 -= 1L
          + if (accum.f1 < 0) {
          — End diff –

          remove?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298390 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -148,12 +161,24 @@ class DecimalSumAggFunction extends AggregateFunction [BigDecimal] { val v = value.asInstanceOf[BigDecimal] val accum = accumulator.asInstanceOf[DecimalSumAccumulator] accum.f0 = accum.f0.add(v) - accum.f1 = true + accum.f1 += 1L + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [BigDecimal] + val accum = accumulator.asInstanceOf [DecimalSumAccumulator] + accum.f0 = accum.f0.add(v) + accum.f1 -= 1L + if (accum.f1 < 0) { — End diff – remove?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298193

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T]

          { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + }

          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[T]
          + val a = accumulator.asInstanceOf[SumAccumulator[T]]
          + a.f0 = numeric.plus(v, a.f0)
          — End diff –

          Please check the tests. This should have been caught.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298193 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -49,13 +50,25 @@ abstract class SumAggFunction [T: Numeric] extends AggregateFunction [T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [T] + val a = accumulator.asInstanceOf[SumAccumulator [T] ] + a.f0 = numeric.plus(v, a.f0) — End diff – Please check the tests. This should have been caught.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298418

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -556,7 +566,9 @@ object AggregateUtil {
          private def transformToAggregateFunctions(
          aggregateCalls: Seq[AggregateCall],
          inputType: RelDataType,

          • groupKeysCount: Int): (Array[Int], Array[TableAggregateFunction[_ <: Any]]) = {
            + groupKeysCount: Int,
            + ifNeedRetraction: Boolean)
              • End diff –

          rename to `needsRetraction`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298418 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -556,7 +566,9 @@ object AggregateUtil { private def transformToAggregateFunctions( aggregateCalls: Seq [AggregateCall] , inputType: RelDataType, groupKeysCount: Int): (Array [Int] , Array[TableAggregateFunction [_ <: Any] ]) = { + groupKeysCount: Int, + ifNeedRetraction: Boolean) End diff – rename to `needsRetraction`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104297801

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala —
          @@ -0,0 +1,205 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.functions.aggfunctions
          +
          +import java.math.BigDecimal
          +import java.util.

          {HashMap => JHashMap, List => JList}

          +
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.tuple.

          {Tuple3 => JTuple3}

          +import org.apache.flink.api.java.typeutils.

          {MapTypeInfo, TupleTypeInfo}

          +import org.apache.flink.table.api.TableException
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +
          +/** The initial accumulator for Max with retraction aggregate function */
          +class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, Long]] with Accumulator
          +
          +/**
          + * Base class for built-in Max with retraction aggregate function
          + *
          + * @tparam T the type for the aggregation result
          + */
          +abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
          +
          + override def createAccumulator(): Accumulator =

          { + val acc = new MaxWithRetractAccumulator[T] + acc.f0 = getInitValue //max + acc.f1 = 0L //total count + acc.f2 = new JHashMap[T, Long]() //store the count for each value + acc + }

          +
          + override def accumulate(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[T]
          + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
          +
          + if (a.f1 == 0 || (ord.compare(a.f0, v) < 0))

          { + a.f0 = v + }

          +
          + a.f1 += 1L
          +
          + if (!a.f2.containsKey(v))

          { + a.f2.put(v, 1L) + }

          else

          { + var count = a.f2.get(v) + count += 1L + a.f2.put(v, count) + }

          + }
          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[T]
          + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
          +
          + a.f1 -= 1L
          +
          + if (!a.f2.containsKey(v))

          { + throw TableException("unexpected retract message") + }

          else {
          + var count = a.f2.get(v)
          + count -= 1L
          + if (count == 0) {
          + //remove the key v from the map if the number of appearance of the value v is 0
          + a.f2.remove(v)
          + //if the total count is 0, we could just simply set the f0(max) to the initial value
          + if (a.f1 == 0)

          { + a.f0 = getInitValue + return + }

          + //if v is the current max value, we have to iterate the map to find the 2nd biggest
          + // value to replace v as the max value
          + if (v == a.f0) {
          + val iterator = a.f2.keySet().iterator()
          + var key = iterator.next()
          + a.f0 = key
          + while (iterator.hasNext()) {
          + key = iterator.next()
          + if (ord.compare(a.f0, key) < 0)

          { + a.f0 = key + }

          + }
          + }
          + } else

          { + a.f2.put(v, count) + }

          + }
          + }
          + }
          +
          + override def getValue(accumulator: Accumulator): T = {
          + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator[T]]
          + if (a.f1 != 0)

          { + a.f0 + }

          else

          { + null.asInstanceOf[T] + }

          + }
          +
          + override def merge(accumulators: JList[Accumulator]): Accumulator = {
          + val ret = accumulators.get(0)
          + var i: Int = 1
          + while (i < accumulators.size()) {
          + val a = accumulators.get.asInstanceOf[MaxWithRetractAccumulator[T]]
          + if (a.f1 != 0) {
          + accumulate(ret.asInstanceOf[MaxWithRetractAccumulator[T]], a.f0)
          — End diff –

          This assumes, that `accumulate` and `retract` is never called on `merged` accumulators (otherwise we would need to merge the hash maps too). If this is the case, we should add this to the documentation of the `merge` function in `AggregateFunction` and clear the hash map.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104297801 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala — @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util. {HashMap => JHashMap, List => JList} + +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple. {Tuple3 => JTuple3} +import org.apache.flink.api.java.typeutils. {MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} + +/** The initial accumulator for Max with retraction aggregate function */ +class MaxWithRetractAccumulator [T] extends JTuple3[T, Long, JHashMap [T, Long] ] with Accumulator + +/** + * Base class for built-in Max with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class MaxWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends AggregateFunction [T] { + + override def createAccumulator(): Accumulator = { + val acc = new MaxWithRetractAccumulator[T] + acc.f0 = getInitValue //max + acc.f1 = 0L //total count + acc.f2 = new JHashMap[T, Long]() //store the count for each value + acc + } + + override def accumulate(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [T] + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator [T] ] + + if (a.f1 == 0 || (ord.compare(a.f0, v) < 0)) { + a.f0 = v + } + + a.f1 += 1L + + if (!a.f2.containsKey(v)) { + a.f2.put(v, 1L) + } else { + var count = a.f2.get(v) + count += 1L + a.f2.put(v, count) + } + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [T] + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator [T] ] + + a.f1 -= 1L + + if (!a.f2.containsKey(v)) { + throw TableException("unexpected retract message") + } else { + var count = a.f2.get(v) + count -= 1L + if (count == 0) { + //remove the key v from the map if the number of appearance of the value v is 0 + a.f2.remove(v) + //if the total count is 0, we could just simply set the f0(max) to the initial value + if (a.f1 == 0) { + a.f0 = getInitValue + return + } + //if v is the current max value, we have to iterate the map to find the 2nd biggest + // value to replace v as the max value + if (v == a.f0) { + val iterator = a.f2.keySet().iterator() + var key = iterator.next() + a.f0 = key + while (iterator.hasNext()) { + key = iterator.next() + if (ord.compare(a.f0, key) < 0) { + a.f0 = key + } + } + } + } else { + a.f2.put(v, count) + } + } + } + } + + override def getValue(accumulator: Accumulator): T = { + val a = accumulator.asInstanceOf[MaxWithRetractAccumulator [T] ] + if (a.f1 != 0) { + a.f0 + } else { + null.asInstanceOf[T] + } + } + + override def merge(accumulators: JList [Accumulator] ): Accumulator = { + val ret = accumulators.get(0) + var i: Int = 1 + while (i < accumulators.size()) { + val a = accumulators.get .asInstanceOf[MaxWithRetractAccumulator [T] ] + if (a.f1 != 0) { + accumulate(ret.asInstanceOf[MaxWithRetractAccumulator [T] ], a.f0) — End diff – This assumes, that `accumulate` and `retract` is never called on `merged` accumulators (otherwise we would need to merge the hash maps too). If this is the case, we should add this to the documentation of the `merge` function in `AggregateFunction` and clear the hash map.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104297207

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala —
          @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction[T] extends AggregateFunction[T] {
          }
          }

          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          — End diff –

          Can we implement `LongAvgAggFunction` without the indirection via `BigIntegralAvgAggFunction`? `BigIntegralAvgAggFunction` is no used anywhere else.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104297207 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala — @@ -137,6 +146,15 @@ abstract class BigIntegralAvgAggFunction [T] extends AggregateFunction [T] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { — End diff – Can we implement `LongAvgAggFunction` without the indirection via `BigIntegralAvgAggFunction`? `BigIntegralAvgAggFunction` is no used anywhere else.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298708

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala —
          @@ -0,0 +1,205 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.functions.aggfunctions
          +
          +import java.math.BigDecimal
          +import java.util.

          {HashMap => JHashMap, List => JList}

          +
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.tuple.

          {Tuple3 => JTuple3}

          +import org.apache.flink.api.java.typeutils.

          {MapTypeInfo, TupleTypeInfo}

          +import org.apache.flink.table.api.TableException
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +
          +/** The initial accumulator for Max with retraction aggregate function */
          +class MaxWithRetractAccumulator[T] extends JTuple3[T, Long, JHashMap[T, Long]] with Accumulator
          +
          +/**
          + * Base class for built-in Max with retraction aggregate function
          + *
          + * @tparam T the type for the aggregation result
          + */
          +abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T]) extends AggregateFunction[T] {
          +
          + override def createAccumulator(): Accumulator = {
          + val acc = new MaxWithRetractAccumulator[T]
          + acc.f0 = getInitValue //max
          + acc.f1 = 0L //total count
          + acc.f2 = new JHashMap[T, Long]() //store the count for each value
          — End diff –

          I spent some time thinking whether a `HashMap` is the best data structure here because searching for the second largest element is quite expensive. But since many more values should be added O(1) and retracted without changing the max element (also O(1)), a `HashMap` should be a good choice. I noticed that the `MapSerializer` of the `MapTypeInfo` serializes and additional `Boolean` for value null checks. This is unnecessary overhead for us, but should be OK for now, IMO.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298708 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala — @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions.aggfunctions + +import java.math.BigDecimal +import java.util. {HashMap => JHashMap, List => JList} + +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple. {Tuple3 => JTuple3} +import org.apache.flink.api.java.typeutils. {MapTypeInfo, TupleTypeInfo} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} + +/** The initial accumulator for Max with retraction aggregate function */ +class MaxWithRetractAccumulator [T] extends JTuple3[T, Long, JHashMap [T, Long] ] with Accumulator + +/** + * Base class for built-in Max with retraction aggregate function + * + * @tparam T the type for the aggregation result + */ +abstract class MaxWithRetractAggFunction [T] (implicit ord: Ordering [T] ) extends AggregateFunction [T] { + + override def createAccumulator(): Accumulator = { + val acc = new MaxWithRetractAccumulator [T] + acc.f0 = getInitValue //max + acc.f1 = 0L //total count + acc.f2 = new JHashMap [T, Long] () //store the count for each value — End diff – I spent some time thinking whether a `HashMap` is the best data structure here because searching for the second largest element is quite expensive. But since many more values should be added O(1) and retracted without changing the max element (also O(1)), a `HashMap` should be a good choice. I noticed that the `MapSerializer` of the `MapTypeInfo` serializes and additional `Boolean` for value null checks. This is unnecessary overhead for us, but should be OK for now, IMO.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298605

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala —
          @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
          def createAccumulator(): Accumulator

          /**
          + * Retract the input values from the accumulator instance.
          — End diff –

          Do we want to mention that only previously `accumulated` values need to be `retracted`?
          Users should not check for that but can use this assumption to implement more efficient functions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298605 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala — @@ -35,6 +36,18 @@ abstract class AggregateFunction [T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** + * Retract the input values from the accumulator instance. — End diff – Do we want to mention that only previously `accumulated` values need to be `retracted`? Users should not check for that but can use this assumption to implement more efficient functions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298400

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -165,9 +190,9 @@ class DecimalSumAggFunction extends AggregateFunction[BigDecimal] {
          var i: Int = 1
          while (i < accumulators.size()) {
          val a = accumulators.get.asInstanceOf[DecimalSumAccumulator]

          • if (a.f1) {
            + if (a.f1 > 0) {
              • End diff –

          remove check?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298400 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -165,9 +190,9 @@ class DecimalSumAggFunction extends AggregateFunction [BigDecimal] { var i: Int = 1 while (i < accumulators.size()) { val a = accumulators.get .asInstanceOf [DecimalSumAccumulator] if (a.f1) { + if (a.f1 > 0) { End diff – remove check?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298555

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala —
          @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] {

          def aggregator: AggregateFunction[T]

          + def ifSupportRetraction: Boolean = true
          +
          @Test

          • // test aggregate functions without partial merge
          • def testAggregateWithoutMerge(): Unit = {
            + // test aggregate and retract functions without partial merge
            + def testAccumulateAndRetractWithoutMerge(): Unit = {
            // iterate over input sets
            for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
          • val accumulator = aggregateVals(vals)
          • val result = aggregator.getValue(accumulator)
            + val accumulator = accumulateVals(vals)
            + var result = aggregator.getValue(accumulator)
            validateResult(expected, result)
            +
            + if (ifSupportRetraction) {
            + retractVals(accumulator, vals)
              • End diff –

          If we want that `accumulate` and `merge` can be used after `merge` was applied, we should add tests for that as well.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298555 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala — @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase [T] { def aggregator: AggregateFunction [T] + def ifSupportRetraction: Boolean = true + @Test // test aggregate functions without partial merge def testAggregateWithoutMerge(): Unit = { + // test aggregate and retract functions without partial merge + def testAccumulateAndRetractWithoutMerge(): Unit = { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { val accumulator = aggregateVals(vals) val result = aggregator.getValue(accumulator) + val accumulator = accumulateVals(vals) + var result = aggregator.getValue(accumulator) validateResult(expected, result) + + if (ifSupportRetraction) { + retractVals(accumulator, vals) End diff – If we want that `accumulate` and `merge` can be used after `merge` was applied, we should add tests for that as well.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298261

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -49,13 +50,25 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T]

          { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + }

          + }
          +
          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[T]
          + val a = accumulator.asInstanceOf[SumAccumulator[T]]
          + a.f0 = numeric.plus(v, a.f0)
          + a.f1 -= 1
          + if (a.f1 < 0) {
          — End diff –

          Do we want to check for these errors? It adds overhead and is the responsibility of the runtime to call the functions correctly. I'd rather add tests with custom aggregation functions to verify the behavior instead of adding overhead to the most widely used aggregation functions. We didn't add these checks to `CountAggregate` and `AverageAggregate`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298261 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -49,13 +50,25 @@ abstract class SumAggFunction [T: Numeric] extends AggregateFunction [T] { val v = value.asInstanceOf[T] val a = accumulator.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.plus(v, a.f0) - a.f1 = true + a.f1 += 1 + } + } + + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [T] + val a = accumulator.asInstanceOf[SumAccumulator [T] ] + a.f0 = numeric.plus(v, a.f0) + a.f1 -= 1 + if (a.f1 < 0) { — End diff – Do we want to check for these errors? It adds overhead and is the responsibility of the runtime to call the functions correctly. I'd rather add tests with custom aggregation functions to verify the behavior instead of adding overhead to the most widely used aggregation functions. We didn't add these checks to `CountAggregate` and `AverageAggregate`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104297285

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala —
          @@ -290,6 +317,19 @@ class DecimalAvgAggFunction extends AggregateFunction[BigDecimal] {
          }
          }

          + override def retract(accumulator: Accumulator, value: Any): Unit = {
          + if (value != null) {
          + val v = value.asInstanceOf[BigDecimal]
          + val accum = accumulator.asInstanceOf[DecimalAvgAccumulator]
          + if (accum.f1 == 0) {
          + accum.f0 = v
          — End diff –

          should be `accum.f0 = v.negate()`.

          I think we can even remove the `if (accum.f1 == 0)` condition and always subtract since `accum.f0` is initialized with `ZERO`. Same would apply for `accumulate`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104297285 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/AvgAggFunction.scala — @@ -290,6 +317,19 @@ class DecimalAvgAggFunction extends AggregateFunction [BigDecimal] { } } + override def retract(accumulator: Accumulator, value: Any): Unit = { + if (value != null) { + val v = value.asInstanceOf [BigDecimal] + val accum = accumulator.asInstanceOf [DecimalAvgAccumulator] + if (accum.f1 == 0) { + accum.f0 = v — End diff – should be `accum.f0 = v.negate()`. I think we can even remove the `if (accum.f1 == 0)` condition and always subtract since `accum.f0` is initialized with `ZERO`. Same would apply for `accumulate`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104296481

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala —
          @@ -35,6 +36,18 @@ abstract class AggregateFunction[T] extends UserDefinedFunction {
          def createAccumulator(): Accumulator

          /**
          + * Retract the input values from the accumulator instance.
          + *
          + * @param accumulator the accumulator which contains the current
          + * aggregated results
          + * @param input the input value (usually obtained from a new arrived data)
          — End diff –

          rename parameter to `retraction`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104296481 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/AggregateFunction.scala — @@ -35,6 +36,18 @@ abstract class AggregateFunction [T] extends UserDefinedFunction { def createAccumulator(): Accumulator /** + * Retract the input values from the accumulator instance. + * + * @param accumulator the accumulator which contains the current + * aggregated results + * @param input the input value (usually obtained from a new arrived data) — End diff – rename parameter to `retraction`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298360

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala —
          @@ -67,9 +80,9 @@ abstract class SumAggFunction[T: Numeric] extends AggregateFunction[T] {
          var i: Int = 0
          while (i < accumulators.size()) {
          val a = accumulators.get.asInstanceOf[SumAccumulator[T]]

          • if (a.f1) {
            + if (a.f1 > 0) {
              • End diff –

          Do we need this check? If `a.f1` is `0`, `a.f0` should be `0` as well (if we only retract what was added before) and we can simply add sum and count.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298360 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/SumAggFunction.scala — @@ -67,9 +80,9 @@ abstract class SumAggFunction [T: Numeric] extends AggregateFunction [T] { var i: Int = 0 while (i < accumulators.size()) { val a = accumulators.get .asInstanceOf[SumAccumulator [T] ] if (a.f1) { + if (a.f1 > 0) { End diff – Do we need this check? If `a.f1` is `0`, `a.f0` should be `0` as well (if we only retract what was added before) and we can simply add sum and count.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3470#discussion_r104298517

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala —
          @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase[T] {

          def aggregator: AggregateFunction[T]

          + def ifSupportRetraction: Boolean = true
          +
          @Test

          • // test aggregate functions without partial merge
          • def testAggregateWithoutMerge(): Unit = {
            + // test aggregate and retract functions without partial merge
            + def testAccumulateAndRetractWithoutMerge(): Unit = {
            // iterate over input sets
            for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
          • val accumulator = aggregateVals(vals)
          • val result = aggregator.getValue(accumulator)
            + val accumulator = accumulateVals(vals)
            + var result = aggregator.getValue(accumulator)
            validateResult(expected, result)
            +
            + if (ifSupportRetraction) {
            + retractVals(accumulator, vals)
              • End diff –

          We also need to check the retraction of a single value not all values (the `SumAggregator` retraction of all values was correct because the count was `0`).
          For that we can accumulate all values and retract one value and compare the result against the result of accumulating all values except the retracted one.

          Also check that retracting `null` does not have an effect.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3470#discussion_r104298517 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala — @@ -36,14 +36,23 @@ abstract class AggFunctionTestBase [T] { def aggregator: AggregateFunction [T] + def ifSupportRetraction: Boolean = true + @Test // test aggregate functions without partial merge def testAggregateWithoutMerge(): Unit = { + // test aggregate and retract functions without partial merge + def testAccumulateAndRetractWithoutMerge(): Unit = { // iterate over input sets for ((vals, expected) <- inputValueSets.zip(expectedResults)) { val accumulator = aggregateVals(vals) val result = aggregator.getValue(accumulator) + val accumulator = accumulateVals(vals) + var result = aggregator.getValue(accumulator) validateResult(expected, result) + + if (ifSupportRetraction) { + retractVals(accumulator, vals) End diff – We also need to check the retraction of a single value not all values (the `SumAggregator` retraction of all values was correct because the count was `0`). For that we can accumulate all values and retract one value and compare the result against the result of accumulating all values except the retracted one. Also check that retracting `null` does not have an effect.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3470

          @fhueske sounds great. I updated the PR. Please take a look. I have add non-retractable and retractable aggregates for MIN and MAX. And add a flag parameter via AggregateUtil.transformToAggregateFunctions() to indicate if an aggregate needs to support retract or not.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3470 @fhueske sounds great. I updated the PR. Please take a look. I have add non-retractable and retractable aggregates for MIN and MAX. And add a flag parameter via AggregateUtil.transformToAggregateFunctions() to indicate if an aggregate needs to support retract or not.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3470

          Hi @shaoxuan-wang, yes. That's a very good point. We should definitely add non-retractable `MIN` and `MAX` aggregates. Would be good to have them now to use them in batch queries.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3470 Hi @shaoxuan-wang, yes. That's a very good point. We should definitely add non-retractable `MIN` and `MAX` aggregates. Would be good to have them now to use them in batch queries.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3470

          Thanks for the PR @shaoxuan-wang. I haven't had a detailed look yet. I think we also need non-retract versions of `MIN` and `MAX`. The retracted version cannot be used to compute unbounded OVER windows and will be less efficient for batch queries (which do not need retraction).

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3470 Thanks for the PR @shaoxuan-wang. I haven't had a detailed look yet. I think we also need non-retract versions of `MIN` and `MAX`. The retracted version cannot be used to compute unbounded OVER windows and will be less efficient for batch queries (which do not need retraction). Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3470

          @fhueske, we should add different built-in aggregates for the ones that are not easy to be retracted (for instance Max and Min), one as withRetract(used for over window as # of items are limited, also for the dataStream when retraction is absolute needed) the other one as withoutRetract (used for dataSet and dataStream where retraction is not needed), it is the optimizer to check the rules and decide which aggregate should be used. I think we can go ahead as this (withRetract) for now, and complete the entire design together with dataStream retraction.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3470 @fhueske, we should add different built-in aggregates for the ones that are not easy to be retracted (for instance Max and Min), one as withRetract(used for over window as # of items are limited, also for the dataStream when retraction is absolute needed) the other one as withoutRetract (used for dataSet and dataStream where retraction is not needed), it is the optimizer to check the rules and decide which aggregate should be used. I think we can go ahead as this (withRetract) for now, and complete the entire design together with dataStream retraction.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shaoxuan-wang opened a pull request:

          https://github.com/apache/flink/pull/3470

          FLINK-5956 [table] Add retract method for aggregateFunction

          This PR adds retraction method for AggregateFunction, it also implements retract methods as well as test cases for all built-in aggregates.

          Retraction method is help for processing update message. It will also very helpful for the grouping window and over window aggregation where we may want to retract the out of window data from the accumulator.

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shaoxuan-wang/flink F5956-submit

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3470.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3470


          commit 09a863bd70f4e8b693276e47942c1dff2fd5375e
          Author: shaoxuan-wang <wshaoxuan@gmail.com>
          Date: 2017-03-03T17:37:50Z

          FLINK-5956 [table] Add retract method for aggregateFunction


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3470 FLINK-5956 [table] Add retract method for aggregateFunction This PR adds retraction method for AggregateFunction, it also implements retract methods as well as test cases for all built-in aggregates. Retraction method is help for processing update message. It will also very helpful for the grouping window and over window aggregation where we may want to retract the out of window data from the accumulator. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F5956-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3470.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3470 commit 09a863bd70f4e8b693276e47942c1dff2fd5375e Author: shaoxuan-wang <wshaoxuan@gmail.com> Date: 2017-03-03T17:37:50Z FLINK-5956 [table] Add retract method for aggregateFunction

            People

            • Assignee:
              ShaoxuanWang Shaoxuan Wang
              Reporter:
              ShaoxuanWang Shaoxuan Wang
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development