Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4557 Table API Stream Aggregations
  3. FLINK-5655

Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      The goal of this issue is to add support for OVER RANGE aggregations on event time streams to the SQL interface.

      Queries similar to the following should be supported:

      SELECT 
        a, 
        SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS sumB,
        MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) AS minB
      FROM myStream
      

      The following restrictions should initially apply:

      • All OVER clauses in the same SELECT clause must be exactly the same.
      • The PARTITION BY clause is optional (no partitioning results in single threaded execution).
      • The ORDER BY clause may only have rowTime() as parameter. rowTime() is a parameterless scalar function that just indicates processing time mode.
      • UNBOUNDED PRECEDING is not supported (see FLINK-5658)
      • FOLLOWING is not supported.

      The restrictions will be resolved in follow up issues. If we find that some of the restrictions are trivial to address, we can add the functionality in this issue as well.

      This issue includes:

      • Design of the DataStream operator to compute OVER ROW aggregates
      • Translation from Calcite's RelNode representation (LogicalProject with RexOver expression).

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          As discussed on the mailing list, I'm assigning this issue to shaoxuan wang.

          Show
          fhueske Fabian Hueske added a comment - As discussed on the mailing list, I'm assigning this issue to shaoxuan wang .
          Hide
          wheat9 Haohui Mai added a comment -

          I'm particularly interested in the part that compiles the sliding windows from Calcite down to the DataStream operators.

          Maybe a dumb question – I wonder, is it possible to specify sliding grouped windows introduced in FLINK-4691 in this syntax? Essentially I am looking at something that directly maps to the sliding windows in the DataStream / Table APIs.

          Essentially I'm looking for https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#sliding-windows. I can see there are two possible ways to express the sliding windows:

          1. SELECT SUM(b) OVER(PARTITION BY SlidingWindowGap(...) RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)) FROM table
          2. SELECT SUM(b) FROM table GROUP BY SlidingWindow(size, interval)
          

          The approach (2) is implemented by https://msdn.microsoft.com/en-us/library/azure/dn835051.aspx.

          My question is that are (1) and (2) semantically equivalent? What is the right way to express grouped sliding windows?

          Show
          wheat9 Haohui Mai added a comment - I'm particularly interested in the part that compiles the sliding windows from Calcite down to the DataStream operators. Maybe a dumb question – I wonder, is it possible to specify sliding grouped windows introduced in FLINK-4691 in this syntax? Essentially I am looking at something that directly maps to the sliding windows in the DataStream / Table APIs. Essentially I'm looking for https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html#sliding-windows . I can see there are two possible ways to express the sliding windows: 1. SELECT SUM(b) OVER(PARTITION BY SlidingWindowGap(...) RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)) FROM table 2. SELECT SUM(b) FROM table GROUP BY SlidingWindow(size, interval) The approach (2) is implemented by https://msdn.microsoft.com/en-us/library/azure/dn835051.aspx . My question is that are (1) and (2) semantically equivalent? What is the right way to express grouped sliding windows?
          Hide
          fhueske Fabian Hueske added a comment -

          Hi Haohui Mai,

          First of all, the term sliding window is a bit overloaded. What we call Sliding (Group) Window in the DataStream API is not the same as a Sliding (Row) Window (hence your examples (1) and (2) are not the semantically equivalent!) I think the sliding row window semantics are more common, but now we have the term in Flink coined differently and I don't think there is consensus to change that. For example this document from the Calcite community calls what Flink calls "Sliding Windows" "Hopping Windows": http://calcite.apache.org/docs/stream.html

          Sorry for the confusion.

          It is possible to define sliding group windows (as described in FLIP-11) in SQL, however, it is a bit cumbersome.
          For instance a sliding window of size 5 minutes that slides every minute could be defined as

          SELECT SUM(b) OVER (PARTITION BY a ORDER BY rowtime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW)
          FROM (
            SELECT a, SUM(b) AS b, MAX(rowtime) AS rowtime
            FROM tab
            GROUP BY a, FLOOR(rowtime TO MINUTE)
          )
          

          This query basically first computes partial aggregates using a tumbling window and then the final aggregates using a row window based on row counts.
          However, there are a few issues with that.

          • we do not want to support event-time OVER ROW windows because they might cause very expensive updates for late data.
          • this is very hard to translate to Flink's built-in windows (or the Table API windows) because the logic is distributed across several operators.

          Hope this helps, Fabian

          Show
          fhueske Fabian Hueske added a comment - Hi Haohui Mai , First of all, the term sliding window is a bit overloaded. What we call Sliding (Group) Window in the DataStream API is not the same as a Sliding (Row) Window (hence your examples (1) and (2) are not the semantically equivalent!) I think the sliding row window semantics are more common, but now we have the term in Flink coined differently and I don't think there is consensus to change that. For example this document from the Calcite community calls what Flink calls "Sliding Windows" "Hopping Windows": http://calcite.apache.org/docs/stream.html Sorry for the confusion. It is possible to define sliding group windows (as described in FLIP-11) in SQL, however, it is a bit cumbersome. For instance a sliding window of size 5 minutes that slides every minute could be defined as SELECT SUM(b) OVER (PARTITION BY a ORDER BY rowtime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) FROM ( SELECT a, SUM(b) AS b, MAX(rowtime) AS rowtime FROM tab GROUP BY a, FLOOR(rowtime TO MINUTE) ) This query basically first computes partial aggregates using a tumbling window and then the final aggregates using a row window based on row counts. However, there are a few issues with that. we do not want to support event-time OVER ROW windows because they might cause very expensive updates for late data. this is very hard to translate to Flink's built-in windows (or the Table API windows) because the logic is distributed across several operators. Hope this helps, Fabian
          Hide
          ShaoxuanWang Shaoxuan Wang added a comment -

          sunjincheng and I want to start the implementation on this event-time bounded over window. We will start with a design doc. Reassigning the task to sunjincheng121.

          Show
          ShaoxuanWang Shaoxuan Wang added a comment - sunjincheng and I want to start the implementation on this event-time bounded over window. We will start with a design doc. Reassigning the task to sunjincheng121.
          Hide
          sunjincheng121 sunjincheng added a comment -

          HI, shaoxuan wang, Thanks for the reassigning.

          Show
          sunjincheng121 sunjincheng added a comment - HI, shaoxuan wang , Thanks for the reassigning.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3629

          FLINK-5655[table]Add event time OVER RANGE BETWEEN x PRECEDING aggr…

          …egation to SQL

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [×] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [×] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-5655-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3629.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3629


          commit ba4860afc7e548a6d8c32ce2276c34431d3999f9
          Author: 金竹 <jincheng.sunjc@alibaba-inc.com>
          Date: 2017-03-28T04:36:03Z

          FLINK-5655[table]Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3629 FLINK-5655 [table] Add event time OVER RANGE BETWEEN x PRECEDING aggr… …egation to SQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-5655 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3629.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3629 commit ba4860afc7e548a6d8c32ce2276c34431d3999f9 Author: 金竹 <jincheng.sunjc@alibaba-inc.com> Date: 2017-03-28T04:36:03Z FLINK-5655 [table] Add event time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108463560

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,213 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {List => JList, ArrayList => JArrayList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.typeutils.

          {ListTypeInfo, RowTypeInfo}

          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for RANGE clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[AggregateFunction]] used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param inputRowType the row type info of input row
          + * @param precedingOffset the preceding offset
          + */
          +class RangeClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val inputRowType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          +
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs) {
          + val data = dataState.get(triggeringTs)
          + if (null != data)

          { + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          +
          + if (null != inputs) {
          +
          + var accumulators = accumulatorState.value
          + var dataListIndex = 0
          + var aggregatesIndex = 0
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length)

          { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + }

          + }
          +
          + // keep up timestamps of retract data
          + val retractTsList: JList[Long] = new JArrayList[Long]
          +
          + val dataTimestampIt = dataState.keys.iterator
          + while (dataTimestampIt.hasNext) {
          + val dataTs: Long = dataTimestampIt.next()
          + val offset = timestamp - dataTs
          + if (offset > precedingOffset) {
          + val retractDataList = dataState.get(dataTs)
          + dataListIndex = 0
          + while (dataListIndex < retractDataList.size()) {
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length)

          { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex) + .retract(accumulator, retractDataList.get(dataListIndex).getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + }

          + dataListIndex += 1
          + retractTsList.add(dataTs)
          — End diff –

          this should be moved on loop out.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108463560 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {List => JList, ArrayList => JArrayList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils. {ListTypeInfo, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for RANGE clause event-time bounded OVER window + * + * @param aggregates the list of all [ [AggregateFunction] ] used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param inputRowType the row type info of input row + * @param precedingOffset the preceding offset + */ +class RangeClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val inputRowType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs) { + val data = dataState.get(triggeringTs) + if (null != data) { + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + + if (null != inputs) { + + var accumulators = accumulatorState.value + var dataListIndex = 0 + var aggregatesIndex = 0 + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + } + } + + // keep up timestamps of retract data + val retractTsList: JList [Long] = new JArrayList [Long] + + val dataTimestampIt = dataState.keys.iterator + while (dataTimestampIt.hasNext) { + val dataTs: Long = dataTimestampIt.next() + val offset = timestamp - dataTs + if (offset > precedingOffset) { + val retractDataList = dataState.get(dataTs) + dataListIndex = 0 + while (dataListIndex < retractDataList.size()) { + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex) + .retract(accumulator, retractDataList.get(dataListIndex).getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + } + dataListIndex += 1 + retractTsList.add(dataTs) — End diff – this should be moved on loop out.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108455167

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,213 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {List => JList, ArrayList => JArrayList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.typeutils.

          {ListTypeInfo, RowTypeInfo}

          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for RANGE clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[AggregateFunction]] used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param inputRowType the row type info of input row
          + * @param precedingOffset the preceding offset
          + */
          +class RangeClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val inputRowType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          +
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs) {
          + val data = dataState.get(triggeringTs)
          + if (null != data)

          { + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          +
          + if (null != inputs) {
          +
          + var accumulators = accumulatorState.value
          + var dataListIndex = 0
          + var aggregatesIndex = 0
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length)

          { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + }

          + }
          +
          + // keep up timestamps of retract data
          + val retractTsList: JList[Long] = new JArrayList[Long]
          — End diff –

          does this in worst case require as much memory as the list in the unbounded case?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108455167 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {List => JList, ArrayList => JArrayList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils. {ListTypeInfo, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for RANGE clause event-time bounded OVER window + * + * @param aggregates the list of all [ [AggregateFunction] ] used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param inputRowType the row type info of input row + * @param precedingOffset the preceding offset + */ +class RangeClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val inputRowType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs) { + val data = dataState.get(triggeringTs) + if (null != data) { + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + + if (null != inputs) { + + var accumulators = accumulatorState.value + var dataListIndex = 0 + var aggregatesIndex = 0 + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + } + } + + // keep up timestamps of retract data + val retractTsList: JList [Long] = new JArrayList [Long] — End diff – does this in worst case require as much memory as the list in the unbounded case?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108470110

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase

          { assertEquals(expected.sorted, StreamITCase.testResults.sorted) }

          + @Test
          + def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
          + val data = Seq(
          + Left((1000L, (1L, 1, "Hello"))),
          + Left((1000L, (1L, 1, "Hello"))),
          — End diff –

          The problem of copying the forwarded fields is not captured by the tests, because all rows for the same timestamp are identical.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108470110 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testBoundPartitionedEventTimeWindowWithRange(): Unit = { + val data = Seq( + Left((1000L, (1L, 1, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), — End diff – The problem of copying the forwarded fields is not captured by the tests, because all rows for the same timestamp are identical.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108470934

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase

          { assertEquals(expected.sorted, StreamITCase.testResults.sorted) }

          + @Test
          + def testBoundPartitionedEventTimeWindowWithRange(): Unit =

          { + val data = Seq( + Left((1000L, (1L, 1, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Right(1000L), + Left((2000L, (2L, 2, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Left((3000L, (3L, 3, "Hello"))), + Right(2000L), + Left((4000L, (4L, 4, "Hello"))), + Right(3000L), + Left((5000L, (5L, 5, "Hello"))), + Right(5000L), + Left((6000L, (6L, 6, "Hello"))), + Right(7000L), + Left((8000L, (7L, 7, "Hello World"))), + Left((8000L, (7L, 7, "Hello World"))), + Left((10000L, (7L, 7, "Hello World"))), + Right(8000L), + Left((12000L, (7L, 7, "Hello World"))), + Right(9000L), + Left((13000L, (8L, 8, "Hello World"))), + Right(11000L), + Left((20000L, (20L, 20, "Hello World"))), + Right(19000L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t1 = env + .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT " + + "c, a, " + + "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + "preceding AND CURRENT ROW)" + + ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + " preceding AND CURRENT ROW)" + + " from T1" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "Hello,1,3,3", "Hello,1,3,3", "Hello,1,3,3", + "Hello,2,6,9", "Hello,2,6,9","Hello,2,6,9", + "Hello,3,4,9", + "Hello,4,2,7", + "Hello,5,2,9", + "Hello,6,2,11", + "Hello World,7,2,14", "Hello World,7,2,14", "Hello World,7,1,7", "Hello World,7,1,7", + "Hello World,8,2,15", + "Hello World,20,1,20") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testBoundNonPartitionedEventTimeWindowWithRange(): Unit = {
          + val data = Seq(
          + Left((1000L, (1L, 1, "Hello"))),
          — End diff –

          Please cover more corner cases with the test data (see above).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108470934 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testBoundPartitionedEventTimeWindowWithRange(): Unit = { + val data = Seq( + Left((1000L, (1L, 1, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Right(1000L), + Left((2000L, (2L, 2, "Hello"))), + Left((2000L, (2L, 2, "Hello"))), + Left((3000L, (3L, 3, "Hello"))), + Right(2000L), + Left((4000L, (4L, 4, "Hello"))), + Right(3000L), + Left((5000L, (5L, 5, "Hello"))), + Right(5000L), + Left((6000L, (6L, 6, "Hello"))), + Right(7000L), + Left((8000L, (7L, 7, "Hello World"))), + Left((8000L, (7L, 7, "Hello World"))), + Left((10000L, (7L, 7, "Hello World"))), + Right(8000L), + Left((12000L, (7L, 7, "Hello World"))), + Right(9000L), + Left((13000L, (8L, 8, "Hello World"))), + Right(11000L), + Left((20000L, (20L, 20, "Hello World"))), + Right(19000L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t1 = env + .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT " + + "c, a, " + + "count(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + "preceding AND CURRENT ROW)" + + ", sum(a) OVER (PARTITION BY c ORDER BY RowTime() RANGE BETWEEN INTERVAL '1' SECOND " + + " preceding AND CURRENT ROW)" + + " from T1" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "Hello,1,3,3", "Hello,1,3,3", "Hello,1,3,3", + "Hello,2,6,9", "Hello,2,6,9","Hello,2,6,9", + "Hello,3,4,9", + "Hello,4,2,7", + "Hello,5,2,9", + "Hello,6,2,11", + "Hello World,7,2,14", "Hello World,7,2,14", "Hello World,7,1,7", "Hello World,7,1,7", + "Hello World,8,2,15", + "Hello World,20,1,20") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testBoundNonPartitionedEventTimeWindowWithRange(): Unit = { + val data = Seq( + Left((1000L, (1L, 1, "Hello"))), — End diff – Please cover more corner cases with the test data (see above).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108453440

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,213 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {List => JList, ArrayList => JArrayList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.typeutils.

          {ListTypeInfo, RowTypeInfo}

          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for RANGE clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[AggregateFunction]] used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param inputRowType the row type info of input row
          + * @param precedingOffset the preceding offset
          + */
          +class RangeClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val inputRowType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          — End diff –

          If the state has not been initialized, will we get a NPE?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108453440 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {List => JList, ArrayList => JArrayList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils. {ListTypeInfo, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for RANGE clause event-time bounded OVER window + * + * @param aggregates the list of all [ [AggregateFunction] ] used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param inputRowType the row type info of input row + * @param precedingOffset the preceding offset + */ +class RangeClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val inputRowType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value — End diff – If the state has not been initialized, will we get a NPE?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108467653

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,213 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {List => JList, ArrayList => JArrayList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.typeutils.

          {ListTypeInfo, RowTypeInfo}

          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for RANGE clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[AggregateFunction]] used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param inputRowType the row type info of input row
          + * @param precedingOffset the preceding offset
          + */
          +class RangeClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val inputRowType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          +
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs) {
          + val data = dataState.get(triggeringTs)
          + if (null != data)

          { + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          +
          + if (null != inputs) {
          +
          + var accumulators = accumulatorState.value
          + var dataListIndex = 0
          + var aggregatesIndex = 0
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length)

          { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + }

          + }
          +
          + // keep up timestamps of retract data
          + val retractTsList: JList[Long] = new JArrayList[Long]
          +
          + val dataTimestampIt = dataState.keys.iterator
          + while (dataTimestampIt.hasNext) {
          + val dataTs: Long = dataTimestampIt.next()
          + val offset = timestamp - dataTs
          + if (offset > precedingOffset) {
          + val retractDataList = dataState.get(dataTs)
          + dataListIndex = 0
          + while (dataListIndex < retractDataList.size()) {
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length)

          { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex) + .retract(accumulator, retractDataList.get(dataListIndex).getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + }

          + dataListIndex += 1
          + retractTsList.add(dataTs)
          + }
          + }
          + }
          +
          + // remove the data that has been retracted
          + dataListIndex = 0
          + while (dataListIndex < retractTsList.size)

          { + dataState.remove(retractTsList.get(dataListIndex)) + dataListIndex += 1 + }

          +
          + // copy forwarded fields to output row
          + aggregatesIndex = 0
          + while (aggregatesIndex < forwardedFieldCount) {
          — End diff –

          I think the forward fields have to be copied fro each row in the `inputs` list before it is emitted.
          If this is not captured by the tests, they should be adapted.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108467653 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {List => JList, ArrayList => JArrayList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils. {ListTypeInfo, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for RANGE clause event-time bounded OVER window + * + * @param aggregates the list of all [ [AggregateFunction] ] used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param inputRowType the row type info of input row + * @param precedingOffset the preceding offset + */ +class RangeClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val inputRowType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs) { + val data = dataState.get(triggeringTs) + if (null != data) { + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + + if (null != inputs) { + + var accumulators = accumulatorState.value + var dataListIndex = 0 + var aggregatesIndex = 0 + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + } + } + + // keep up timestamps of retract data + val retractTsList: JList [Long] = new JArrayList [Long] + + val dataTimestampIt = dataState.keys.iterator + while (dataTimestampIt.hasNext) { + val dataTs: Long = dataTimestampIt.next() + val offset = timestamp - dataTs + if (offset > precedingOffset) { + val retractDataList = dataState.get(dataTs) + dataListIndex = 0 + while (dataListIndex < retractDataList.size()) { + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex) + .retract(accumulator, retractDataList.get(dataListIndex).getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + } + dataListIndex += 1 + retractTsList.add(dataTs) + } + } + } + + // remove the data that has been retracted + dataListIndex = 0 + while (dataListIndex < retractTsList.size) { + dataState.remove(retractTsList.get(dataListIndex)) + dataListIndex += 1 + } + + // copy forwarded fields to output row + aggregatesIndex = 0 + while (aggregatesIndex < forwardedFieldCount) { — End diff – I think the forward fields have to be copied fro each row in the `inputs` list before it is emitted. If this is not captured by the tests, they should be adapted.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108470691

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase

          { assertEquals(expected.sorted, StreamITCase.testResults.sorted) }

          + @Test
          + def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
          + val data = Seq(
          — End diff –

          I think a bit more diverse test data would be good to cover more corner cases.
          Basically check that each loop is correctly triggered (retract data of two (or more timestamps), emit different rows in one timestamp, etc.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108470691 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testBoundPartitionedEventTimeWindowWithRange(): Unit = { + val data = Seq( — End diff – I think a bit more diverse test data would be good to cover more corner cases. Basically check that each loop is correctly triggered (retract data of two (or more timestamps), emit different rows in one timestamp, etc.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108450477

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -91,21 +91,22 @@ object AggregateUtil {
          }

          /**

          • * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
            + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for
          • bounded OVER window to evaluate final aggregate value.
            *
          • @param namedAggregates List of calls to aggregate functions and their output field names
          • @param inputType Input row type
          • * @param inputFields All input fields
          • @param precedingOffset the preceding offset
            + * @param isRangeClause It is a tag that indicates whether the OVER clause is rangeClause
          • @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
          • @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
            */
          • private[flink] def createRowsClauseBoundedOverProcessFunction(
            + private[flink] def createBoundedOverProcessFunction(
            namedAggregates: Seq[CalcitePair[AggregateCall, String]],
            inputType: RelDataType,
            precedingOffset: Long,
          • isRowTimeType: Boolean): ProcessFunction[Row, Row] = {
            + isRangeClause: Boolean = false,
              • End diff –

          I don't think we should default values here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108450477 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -91,21 +91,22 @@ object AggregateUtil { } /** * Create an [ [org.apache.flink.streaming.api.functions.ProcessFunction] ] for ROWS clause + * Create an [ [org.apache.flink.streaming.api.functions.ProcessFunction] ] for bounded OVER window to evaluate final aggregate value. * @param namedAggregates List of calls to aggregate functions and their output field names @param inputType Input row type * @param inputFields All input fields @param precedingOffset the preceding offset + * @param isRangeClause It is a tag that indicates whether the OVER clause is rangeClause @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType @return [ [org.apache.flink.streaming.api.functions.ProcessFunction] ] */ private [flink] def createRowsClauseBoundedOverProcessFunction( + private [flink] def createBoundedOverProcessFunction( namedAggregates: Seq[CalcitePair [AggregateCall, String] ], inputType: RelDataType, precedingOffset: Long, isRowTimeType: Boolean): ProcessFunction [Row, Row] = { + isRangeClause: Boolean = false, End diff – I don't think we should default values here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108467310

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,213 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {List => JList, ArrayList => JArrayList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.typeutils.

          {ListTypeInfo, RowTypeInfo}

          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for RANGE clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[AggregateFunction]] used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param inputRowType the row type info of input row
          + * @param precedingOffset the preceding offset
          + */
          +class RangeClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val inputRowType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          +
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs) {
          + val data = dataState.get(triggeringTs)
          + if (null != data)

          { + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          +
          + if (null != inputs) {
          +
          + var accumulators = accumulatorState.value
          + var dataListIndex = 0
          + var aggregatesIndex = 0
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length)

          { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + }

          + }
          +
          + // keep up timestamps of retract data
          + val retractTsList: JList[Long] = new JArrayList[Long]
          +
          + val dataTimestampIt = dataState.keys.iterator
          + while (dataTimestampIt.hasNext) {
          + val dataTs: Long = dataTimestampIt.next()
          + val offset = timestamp - dataTs
          + if (offset > precedingOffset) {
          + val retractDataList = dataState.get(dataTs)
          + dataListIndex = 0
          + while (dataListIndex < retractDataList.size()) {
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length)

          { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex) + .retract(accumulator, retractDataList.get(dataListIndex).getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + }

          + dataListIndex += 1
          + retractTsList.add(dataTs)
          + }
          + }
          + }
          +
          + // remove the data that has been retracted
          + dataListIndex = 0
          + while (dataListIndex < retractTsList.size)

          { + dataState.remove(retractTsList.get(dataListIndex)) + dataListIndex += 1 + }

          +
          + // copy forwarded fields to output row
          + aggregatesIndex = 0
          + while (aggregatesIndex < forwardedFieldCount)

          { + output.setField(aggregatesIndex, inputs.get(0).getField(aggregatesIndex)) + aggregatesIndex += 1 + }

          +
          + dataListIndex = 0
          + while (dataListIndex < inputs.size()) {
          + // accumulate current row and set aggregate in output row
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length) {
          + val index = forwardedFieldCount + aggregatesIndex
          + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator]
          + aggregates(aggregatesIndex).accumulate(accumulator, inputs.get(dataListIndex).getField(aggFields(aggregatesIndex)))
          + if (dataListIndex >= (inputs.size() - 1)) {
          — End diff –

          can't we do this after both loops terminated?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108467310 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {List => JList, ArrayList => JArrayList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils. {ListTypeInfo, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for RANGE clause event-time bounded OVER window + * + * @param aggregates the list of all [ [AggregateFunction] ] used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param inputRowType the row type info of input row + * @param precedingOffset the preceding offset + */ +class RangeClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val inputRowType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs) { + val data = dataState.get(triggeringTs) + if (null != data) { + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + + if (null != inputs) { + + var accumulators = accumulatorState.value + var dataListIndex = 0 + var aggregatesIndex = 0 + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + } + } + + // keep up timestamps of retract data + val retractTsList: JList [Long] = new JArrayList [Long] + + val dataTimestampIt = dataState.keys.iterator + while (dataTimestampIt.hasNext) { + val dataTs: Long = dataTimestampIt.next() + val offset = timestamp - dataTs + if (offset > precedingOffset) { + val retractDataList = dataState.get(dataTs) + dataListIndex = 0 + while (dataListIndex < retractDataList.size()) { + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex) + .retract(accumulator, retractDataList.get(dataListIndex).getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + } + dataListIndex += 1 + retractTsList.add(dataTs) + } + } + } + + // remove the data that has been retracted + dataListIndex = 0 + while (dataListIndex < retractTsList.size) { + dataState.remove(retractTsList.get(dataListIndex)) + dataListIndex += 1 + } + + // copy forwarded fields to output row + aggregatesIndex = 0 + while (aggregatesIndex < forwardedFieldCount) { + output.setField(aggregatesIndex, inputs.get(0).getField(aggregatesIndex)) + aggregatesIndex += 1 + } + + dataListIndex = 0 + while (dataListIndex < inputs.size()) { + // accumulate current row and set aggregate in output row + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + val index = forwardedFieldCount + aggregatesIndex + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf [Accumulator] + aggregates(aggregatesIndex).accumulate(accumulator, inputs.get(dataListIndex).getField(aggFields(aggregatesIndex))) + if (dataListIndex >= (inputs.size() - 1)) { — End diff – can't we do this after both loops terminated?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108469954

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase

          { assertEquals(expected.sorted, StreamITCase.testResults.sorted) }

          + @Test
          + def testBoundPartitionedEventTimeWindowWithRange(): Unit = {
          + val data = Seq(
          + Left((1000L, (1L, 1, "Hello"))),
          + Left((1000L, (1L, 1, "Hello"))),
          + Left((1000L, (1L, 1, "Hello"))),
          — End diff –

          If the window size is 1 second, there should also be some data in between (like `1500L`)

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108469954 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -411,6 +411,128 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testBoundPartitionedEventTimeWindowWithRange(): Unit = { + val data = Seq( + Left((1000L, (1L, 1, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), + Left((1000L, (1L, 1, "Hello"))), — End diff – If the window size is 1 second, there should also be some data in between (like `1500L`)
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3629#discussion_r108467898

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,213 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {List => JList, ArrayList => JArrayList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.typeutils.

          {ListTypeInfo, RowTypeInfo}

          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for RANGE clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[AggregateFunction]] used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param inputRowType the row type info of input row
          + * @param precedingOffset the preceding offset
          + */
          +class RangeClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val inputRowType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          +
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs) {
          + val data = dataState.get(triggeringTs)
          + if (null != data)

          { + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          +
          + if (null != inputs) {
          +
          + var accumulators = accumulatorState.value
          + var dataListIndex = 0
          + var aggregatesIndex = 0
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length)

          { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + }

          + }
          +
          + // keep up timestamps of retract data
          + val retractTsList: JList[Long] = new JArrayList[Long]
          +
          + val dataTimestampIt = dataState.keys.iterator
          + while (dataTimestampIt.hasNext) {
          + val dataTs: Long = dataTimestampIt.next()
          + val offset = timestamp - dataTs
          + if (offset > precedingOffset) {
          + val retractDataList = dataState.get(dataTs)
          + dataListIndex = 0
          + while (dataListIndex < retractDataList.size()) {
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length)

          { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex) + .retract(accumulator, retractDataList.get(dataListIndex).getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + }

          + dataListIndex += 1
          + retractTsList.add(dataTs)
          + }
          + }
          + }
          +
          + // remove the data that has been retracted
          + dataListIndex = 0
          + while (dataListIndex < retractTsList.size)

          { + dataState.remove(retractTsList.get(dataListIndex)) + dataListIndex += 1 + }

          +
          + // copy forwarded fields to output row
          + aggregatesIndex = 0
          + while (aggregatesIndex < forwardedFieldCount)

          { + output.setField(aggregatesIndex, inputs.get(0).getField(aggregatesIndex)) + aggregatesIndex += 1 + }

          +
          + dataListIndex = 0
          + while (dataListIndex < inputs.size()) {
          + // accumulate current row and set aggregate in output row
          + aggregatesIndex = 0
          + while (aggregatesIndex < aggregates.length) {
          + val index = forwardedFieldCount + aggregatesIndex
          + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator]
          + aggregates(aggregatesIndex).accumulate(accumulator, inputs.get(dataListIndex).getField(aggFields(aggregatesIndex)))
          + if (dataListIndex >= (inputs.size() - 1))

          { + output.setField(index, aggregates(aggregatesIndex).getValue(accumulator)) + }

          + aggregatesIndex += 1
          + }
          + dataListIndex += 1
          + }
          +
          +
          + dataListIndex = 0
          + while (dataListIndex < inputs.size()) {
          + out.collect(output)
          — End diff –

          We have to copy the forwarded fields for each row in `inputs` because each row might be different. The aggregates can be reused, because they are all the same for all records with the same time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3629#discussion_r108467898 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {List => JList, ArrayList => JArrayList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils. {ListTypeInfo, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for RANGE clause event-time bounded OVER window + * + * @param aggregates the list of all [ [AggregateFunction] ] used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param inputRowType the row type info of input row + * @param precedingOffset the preceding offset + */ +class RangeClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val inputRowType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs) { + val data = dataState.get(triggeringTs) + if (null != data) { + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new JArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + + if (null != inputs) { + + var accumulators = accumulatorState.value + var dataListIndex = 0 + var aggregatesIndex = 0 + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + accumulators.setField(aggregatesIndex, aggregates(aggregatesIndex).createAccumulator()) + aggregatesIndex += 1 + } + } + + // keep up timestamps of retract data + val retractTsList: JList [Long] = new JArrayList [Long] + + val dataTimestampIt = dataState.keys.iterator + while (dataTimestampIt.hasNext) { + val dataTs: Long = dataTimestampIt.next() + val offset = timestamp - dataTs + if (offset > precedingOffset) { + val retractDataList = dataState.get(dataTs) + dataListIndex = 0 + while (dataListIndex < retractDataList.size()) { + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] + aggregates(aggregatesIndex) + .retract(accumulator, retractDataList.get(dataListIndex).getField(aggFields(aggregatesIndex))) + aggregatesIndex += 1 + } + dataListIndex += 1 + retractTsList.add(dataTs) + } + } + } + + // remove the data that has been retracted + dataListIndex = 0 + while (dataListIndex < retractTsList.size) { + dataState.remove(retractTsList.get(dataListIndex)) + dataListIndex += 1 + } + + // copy forwarded fields to output row + aggregatesIndex = 0 + while (aggregatesIndex < forwardedFieldCount) { + output.setField(aggregatesIndex, inputs.get(0).getField(aggregatesIndex)) + aggregatesIndex += 1 + } + + dataListIndex = 0 + while (dataListIndex < inputs.size()) { + // accumulate current row and set aggregate in output row + aggregatesIndex = 0 + while (aggregatesIndex < aggregates.length) { + val index = forwardedFieldCount + aggregatesIndex + val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf [Accumulator] + aggregates(aggregatesIndex).accumulate(accumulator, inputs.get(dataListIndex).getField(aggFields(aggregatesIndex))) + if (dataListIndex >= (inputs.size() - 1)) { + output.setField(index, aggregates(aggregatesIndex).getValue(accumulator)) + } + aggregatesIndex += 1 + } + dataListIndex += 1 + } + + + dataListIndex = 0 + while (dataListIndex < inputs.size()) { + out.collect(output) — End diff – We have to copy the forwarded fields for each row in `inputs` because each row might be different. The aggregates can be reused, because they are all the same for all records with the same time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3629

          Hi @fhueske thanks a lot for your review. I have updated the PR according to your comments.
          There is one thing need your explanation.i.e. :
          `Also, I realized that this implementation (other OVER windows are probably affected as well) will not discard state if the key space evolves. We should add a JIRA to add a configuration parameter to remove state if no row was received for a certain amount of time.`
          IMO. For event-time case, we use data-driven management state. each processing a data will be timely processing of expired data. if the data continuously, our state will be promptly processed. If there is no data for a long time, the data in the state will not expand. If the next data is not sure when the arrival of the case, I think we should not clear the data, because the removal of data will lead to the next calculation error. If the state data has TTL settings, user can config TTL which can be friendly to clear the state. If i understand you correctly you said that the configuration parameters, that is the the TTL config, is this correct? If not so, I'm appreciated If you can tell me your detailed thoughts.

          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3629 Hi @fhueske thanks a lot for your review. I have updated the PR according to your comments. There is one thing need your explanation.i.e. : `Also, I realized that this implementation (other OVER windows are probably affected as well) will not discard state if the key space evolves. We should add a JIRA to add a configuration parameter to remove state if no row was received for a certain amount of time.` IMO. For event-time case, we use data-driven management state. each processing a data will be timely processing of expired data. if the data continuously, our state will be promptly processed. If there is no data for a long time, the data in the state will not expand. If the next data is not sure when the arrival of the case, I think we should not clear the data, because the removal of data will lead to the next calculation error. If the state data has TTL settings, user can config TTL which can be friendly to clear the state. If i understand you correctly you said that the configuration parameters, that is the the TTL config, is this correct? If not so, I'm appreciated If you can tell me your detailed thoughts. Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3629

          The problem with the state clean-up is that we always keep some state for each key. If the keys of the stream are moving (i.e., there is not a fixed set of keys) then the size of the state will grow infinitely.

          So we should add a configuration option that defines when the state of a key can be cleaned, i.e., a min and max duration how long we keep the state of a key without any updates. This should be similar to the RocksDB TTL, but I would like to have this not only for RocksDB but for all backends by registering timers that clear the state.

          I don't think we have to add this for this PR but should address this issue before the 1.3 release.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3629 The problem with the state clean-up is that we always keep some state for each key. If the keys of the stream are moving (i.e., there is not a fixed set of keys) then the size of the state will grow infinitely. So we should add a configuration option that defines when the state of a key can be cleaned, i.e., a min and max duration how long we keep the state of a key without any updates. This should be similar to the RocksDB TTL, but I would like to have this not only for RocksDB but for all backends by registering timers that clear the state. I don't think we have to add this for this PR but should address this issue before the 1.3 release.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3629

          Hi @fhueske I got you. I will seriously consider the feature which you said, and then I will give you feedback in the new thread.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3629 Hi @fhueske I got you. I will seriously consider the feature which you said, and then I will give you feedback in the new thread.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3629

          Thanks for the update @sunjincheng121.
          This PR is good to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3629 Thanks for the update @sunjincheng121. This PR is good to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user greghogan commented on the issue:

          https://github.com/apache/flink/pull/3629

          @sunjincheng121 tests are failing for scalastyle line length violations.

          Show
          githubbot ASF GitHub Bot added a comment - Github user greghogan commented on the issue: https://github.com/apache/flink/pull/3629 @sunjincheng121 tests are failing for scalastyle line length violations.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3629

          Ah, thanks @greghogan for pointing this out!

          @sunjincheng121, I'll fix the style violation before merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3629 Ah, thanks @greghogan for pointing this out! @sunjincheng121, I'll fix the style violation before merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3629

          Thanks very much. @fhueske @greghogan

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3629 Thanks very much. @fhueske @greghogan
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3629

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3629
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with d4665a00a4262f89b166895f73a54daab2f25e1c

          Show
          fhueske Fabian Hueske added a comment - Implemented with d4665a00a4262f89b166895f73a54daab2f25e1c

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              7 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development