Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4557 Table API Stream Aggregations
  3. FLINK-5990

Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      The goal of this issue is to add support for OVER ROWS aggregations on event time streams to the SQL interface.

      Queries similar to the following should be supported:

      SELECT 
        a, 
        SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS sumB,
        MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS minB
      FROM myStream
      

      The following restrictions should initially apply:

      • All OVER clauses in the same SELECT clause must be exactly the same.
      • The PARTITION BY clause is required
      • The ORDER BY clause may only have rowTime() as parameter. rowTime() is a parameterless scalar function that just indicates event time mode.
      • UNBOUNDED PRECEDING is not supported (see FLINK-5803)
      • FOLLOWING is not supported.

      The restrictions will be resolved in follow up issues. If we find that some of the restrictions are trivial to address, we can add the functionality in this issue as well.

      This issue includes:

      • Design of the DataStream operator to compute OVER ROW aggregates
      • Translation from Calcite's RelNode representation (LogicalProject with RexOver expression).

        Issue Links

          Activity

          Hide
          sunjincheng121 sunjincheng added a comment -

          Fabian Hueske I think ROWS Clause

          OVER ROWS BETWEEN x PRECEDING 

          and RANGE Clause

           OVER RANGE BETWEEN x PRECEDING

          are a little different. So,I created this JIRA. is that make sense for you?

          Best,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - Fabian Hueske I think ROWS Clause OVER ROWS BETWEEN x PRECEDING and RANGE Clause OVER RANGE BETWEEN x PRECEDING are a little different. So,I created this JIRA. is that make sense for you? Best, SunJincheng
          Hide
          fhueske Fabian Hueske added a comment -

          Actually, we decided to not cover this case because it does not handle late arriving data well. In fact, all records which follow a late record need to be recomputed which is too expensive.
          See https://lists.apache.org/thread.html/1e72095524ed3845d66a980e57a009d8f2f907624b1c843e12ea6a94@%3Cdev.flink.apache.org%3E

          Show
          fhueske Fabian Hueske added a comment - Actually, we decided to not cover this case because it does not handle late arriving data well. In fact, all records which follow a late record need to be recomputed which is too expensive. See https://lists.apache.org/thread.html/1e72095524ed3845d66a980e57a009d8f2f907624b1c843e12ea6a94@%3Cdev.flink.apache.org%3E
          Hide
          sunjincheng121 sunjincheng added a comment -

          Yes, Fabian Hueske It is difficult to deal with disorderly problem. Although I thought of a approach, but I do not know if it is reasonable. The approach like that, In processFunction we use both ValueState and ListState(OrderedState It is best), ValueState can store incremental calculation result, ListState can buffer some data (allowedLateness), And together with the timerServes, Timer registered when each element arrives. And I can link a prototype implementation later. What do you think ?

          Show
          sunjincheng121 sunjincheng added a comment - Yes, Fabian Hueske It is difficult to deal with disorderly problem. Although I thought of a approach, but I do not know if it is reasonable. The approach like that, In processFunction we use both ValueState and ListState(OrderedState It is best), ValueState can store incremental calculation result, ListState can buffer some data (allowedLateness), And together with the timerServes, Timer registered when each element arrives. And I can link a prototype implementation later. What do you think ?
          Hide
          sunjincheng121 sunjincheng added a comment - - edited

          In this JIRA. I'll definition two concepts:
          1. out-of-order
          2. late-event

          For above two concepts, we need add a configuration:
          1. allowedLateness: which user can definition.the value of the allowedLateness is length of time that the user configures the allowable data delay.

          Determine whether the out-of-order or late-event is based on the value of allowedLateness
          e.g.:

          • allowedLateness = 2
          • InputData:
            ```
            (1L, 1, "Hello"),
            (2L, 2, "Hello"),
            (4L, 4, "Hello"),
            (3L, 3, "Hello"),
            (7L, 7, "Hello"),
            (7L, 8, "Hello"),
            (5L, 5, "Hello"),
            (8L, 8, "Hello World"),
            (20L, 20, "Hello World"),
            (9L, 9, "Hello World"))

          `(3L, 3, "Hello")` is out-of-order, Because 4-3=1 < 2
          `(9L, 9, "Hello World")` is late-event,Because 20-9=11>2
          What do you think ? @fhueske

          Show
          sunjincheng121 sunjincheng added a comment - - edited In this JIRA. I'll definition two concepts: 1. out-of-order 2. late-event For above two concepts, we need add a configuration: 1. allowedLateness: which user can definition.the value of the allowedLateness is length of time that the user configures the allowable data delay. Determine whether the out-of-order or late-event is based on the value of allowedLateness e.g.: allowedLateness = 2 InputData: ``` (1L, 1, "Hello"), (2L, 2, "Hello"), (4L, 4, "Hello"), (3L, 3, "Hello"), (7L, 7, "Hello"), (7L, 8, "Hello"), (5L, 5, "Hello"), (8L, 8, "Hello World"), (20L, 20, "Hello World"), (9L, 9, "Hello World")) `(3L, 3, "Hello")` is out-of-order, Because 4-3=1 < 2 `(9L, 9, "Hello World")` is late-event,Because 20-9=11>2 What do you think ? @fhueske
          Hide
          sunjincheng121 sunjincheng added a comment -

          Fabian Hueske I had remove the link of the prototype implementation. Because l realized that I should open a PR, so your comments can be seen by all community members and can share your suggestions.

          Best,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - Fabian Hueske I had remove the link of the prototype implementation. Because l realized that I should open a PR, so your comments can be seen by all community members and can share your suggestions. Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3585

          FLINK-5990[table]Add event time OVER ROWS BETWEEN x PRECEDING aggre…

          …gation to SQL.

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [×] General
          • The pull request references the related JIRA issue ("FLINK-5990 Add event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [×] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-5990-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3585.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3585


          commit 3123078cbe3c108d145adbe2b898891235d9e53c
          Author: 金竹 <jincheng.sunjc@alibaba-inc.com>
          Date: 2017-03-19T15:31:00Z

          FLINK-5990[table]Add event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3585 FLINK-5990 [table] Add event time OVER ROWS BETWEEN x PRECEDING aggre… …gation to SQL. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" FLINK-5990 Add event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-5990 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3585.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3585 commit 3123078cbe3c108d145adbe2b898891235d9e53c Author: 金竹 <jincheng.sunjc@alibaba-inc.com> Date: 2017-03-19T15:31:00Z FLINK-5990 [table] Add event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL.
          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi, Fabian Hueske I have opened the PR#3585, so that your comments can share to all the community members, I think your comments are helpful for all relative JIRAs.

          Thanks,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - Hi, Fabian Hueske I have opened the PR #3585 , so that your comments can share to all the community members, I think your comments are helpful for all relative JIRAs. Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3585

          Hi @sunjincheng121, thanks for this PR.

          To be honest, I don't completely understand the implementation of the `RowsClauseBoundedOverProcessFunction`.

          I thought about another design, that I would like to discuss (not considering process time which should be addressed in a separate ProcessFunction, IMO.):

          • we have three state objects: 1) the accumulator row, 2) a MapState[Long, List[Row]] for not processed data (`toProcess`), 3) a MapState[Long, List[Row]] for processed data which needs to be retracted (`toRetract`).
          • processElement() puts the element in the `toProcess` MapState with the original timestamp and registers a timer for `currentWatermark() + 1`. Hence, we only have a single timer which triggers when the next watermark is reached.
          • onTimer() is called for the next watermark. We get an iterator over the `toProcess` MapState. For RocksDB the iterator is sorted on the key. We sort-insert the records from the iterator into a `LinkedList` (since the iterator is sorted for RocksDB this will be simple append. For other state backends it will be more expensive but we can tolerate that, IMO). We do the same for `toRetract` MapState. So we have two sorted lists for data to accumulate and to retract. We go over both sorted lists and accumulate and retract for each step using the accumulator state. Then we emit the new row and move the emitted row from the `toProcess` MapState to the `toRetract` MapState.

          This design has the benefit of using RocksDB to sort. Moreover, we could also put only those fields into the toRetract state that need to be retracted instead of the full row.

          What do you think about this approach?

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3585 Hi @sunjincheng121, thanks for this PR. To be honest, I don't completely understand the implementation of the `RowsClauseBoundedOverProcessFunction`. I thought about another design, that I would like to discuss (not considering process time which should be addressed in a separate ProcessFunction, IMO.): we have three state objects: 1) the accumulator row, 2) a MapState[Long, List [Row] ] for not processed data (`toProcess`), 3) a MapState[Long, List [Row] ] for processed data which needs to be retracted (`toRetract`). processElement() puts the element in the `toProcess` MapState with the original timestamp and registers a timer for `currentWatermark() + 1`. Hence, we only have a single timer which triggers when the next watermark is reached. onTimer() is called for the next watermark. We get an iterator over the `toProcess` MapState. For RocksDB the iterator is sorted on the key. We sort-insert the records from the iterator into a `LinkedList` (since the iterator is sorted for RocksDB this will be simple append. For other state backends it will be more expensive but we can tolerate that, IMO). We do the same for `toRetract` MapState. So we have two sorted lists for data to accumulate and to retract. We go over both sorted lists and accumulate and retract for each step using the accumulator state. Then we emit the new row and move the emitted row from the `toProcess` MapState to the `toRetract` MapState. This design has the benefit of using RocksDB to sort. Moreover, we could also put only those fields into the toRetract state that need to be retracted instead of the full row. What do you think about this approach? Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3585

          Hi, @fhueske Thanks for your attention to the PR. Today I'll proposal the design doc and link here.

          Best,
          SunJIncheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3585 Hi, @fhueske Thanks for your attention to the PR. Today I'll proposal the design doc and link here. Best, SunJIncheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3585

          Hi, @fhueske When I want to write a design document, I compare the different between your proposed design and the current program I propose. I finally decided to explain this directly in Jira instead of another design doc. If I understand you correctly, the differences of our design are as follows:

              • Regard to the state:**
          • [ ] 1) the accumulator row -> accumulatorState (we are the same)
          • [ ] 2) a MapState[Long, List[Row]] -> (I did not use this, as I mentioned below, we do not a sorted map for accumulate, so it can share the same state (In my case, I just use one dataState) )
          • [ ] 3) a MapState[Long, List[Row]] -> (we are the same)
              • About core processing logic:**
          • [ ] processElement:
          • 1) Check if the data is late.
          • 2) Register timer
          • [ ] onTimer:
          • 1) toRectract -> retract (we are the same)
          • 2) toProcess -> accumlate (we are the same)
              • About sorting with mapState:**
          • toProcess -> accumlate (Do not need to sort)
          • toRectract > retract (Need to sort) The purpose of sorting in `retract` is to find expired data, such as for a ` 2 proceding` over window case, the dataState is (0> row, 1-> row, 2-> row, 3-> row), When handling element `3 ` we need find the element` 0` and `retract` it.

          If the backend state is `RocksDB` and the key is Long type, then luckily the data is sorted by key ensured by RocksDB. But if we use other backend, it may not be sorted, such as (heapBackend). So the existing MapState key sorting has some limitations, unless we implement a `sortedMapState`. This semantics can ensure that any backend are ordered. So my current design uses traversal dataState.keys to find the smallest one for retraction.

              • About register timer:**
                I would like to use the `timestamp` to register timer. For example for `(7L, 7, "Hello")` I use `7` to register timer. IMO, we should not use `currentWatermark + 1` to register timer, as the currentWatermark can not be representing the "event time" of each data. If we explicitly use `currentWatermark + 1`, there will be problems. For instance, two data with different event time could be arrival at same time. Then we will get the same register timer (because we have not updated currentWatermark). So eventually, these two data will different event time will be emitted at the same time, which is unexpected.

          FYI, examples (using my design) are as follows:
          (Refer to:[ `testBoundPartitionedEventTimeWindowWithRowWithLateEvent`](https://github.com/sunjincheng121/flink/blob/FLINK-5990-PR/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala))

          • Sql:
            ` SELECT +c, a, sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW) from T1`
          • TestData:
            ```
            val data = List(
            (1L, 1, "Hello"),
            (2L, 2, "Hello"),
            (4L, 4, "Hello"),
            (3L, 3, "Hello"),
            (7L, 7, "Hello"),
            (8L, 8, "Hello World"),
            (7L, 8, "Hello"),
            (5L, 5, "Hello"),
            (20L, 20, "Hello World"),
            (9L, 9, "Hello World"))
            ```
          • WaterMark:
            ```
            (1L, 1, "Hello") -> -1
            (2L, 2, "Hello") -> 0
            (4L, 4, "Hello") -> 2
            (3L, 3, "Hello") -> 1
            (7L, 7, "Hello") -> 5
            (8L, 8, "Hello World") -> 6
            (7L, 8, "Hello") -> 5
            (5L, 5, "Hello") -> 3
            (20L, 20, "Hello World") -> 18
            (9L, 9, "Hello World")) -> 7
            ```
          • Print info in `processElement ` and `onTimer`:

          ```
          process_ts_lastTs_wm:[1][0][-9223372036854775808]
          process_ts_lastTs_wm:[2][0][-1]
          process_ts_lastTs_wm:[4][0][0]
          ====>ontime_ts_wm_triggerTs_lastRow:[1][2][null][null]
          ====>ontime_ts_wm_triggerTs_lastRow:[2][2][1][null]
          process_ts_lastTs_wm:[3][2][2]
          process_ts_lastTs_wm:[7][2][2]
          ====>ontime_ts_wm_triggerTs_lastRow:[3][5][2][null]
          ====>ontime_ts_wm_triggerTs_lastRow:[4][5][3][(1,Hello)]
          process_ts_lastTs_wm:[8][0][5]
          process_ts_lastTs_wm:[7][4][6]
          <<late_ts_lastTs_wm:[5][4][6]
          process_ts_lastTs_wm:[20][0][6]
          ====>ontime_ts_wm_triggerTs_lastRow:[7][18][4][(2,Hello)]
          ====>ontime_ts_wm_triggerTs_lastRow:[7][18][4][(3,Hello)]
          ====>ontime_ts_wm_triggerTs_lastRow:[8][18][null][null]
          <<late_ts_lastTs_wm:[9][8][18]
          ====>ontime_ts_wm_triggerTs_lastRow:[20][9223372036854775807][8][null]
          ```
          BTW, I updated the code, had separated the proc-time and event-time in to two classes.

          I am trying to make a detailed description, but I do not know if it is clear enough, feel free to feedback me if there is anything not clear.

          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3585 Hi, @fhueske When I want to write a design document, I compare the different between your proposed design and the current program I propose. I finally decided to explain this directly in Jira instead of another design doc. If I understand you correctly, the differences of our design are as follows: Regard to the state:** [ ] 1) the accumulator row -> accumulatorState (we are the same) [ ] 2) a MapState[Long, List [Row] ] -> (I did not use this, as I mentioned below, we do not a sorted map for accumulate, so it can share the same state (In my case, I just use one dataState) ) [ ] 3) a MapState[Long, List [Row] ] -> (we are the same) About core processing logic:** [ ] processElement: 1) Check if the data is late. 2) Register timer [ ] onTimer: 1) toRectract -> retract (we are the same) 2) toProcess -> accumlate (we are the same) About sorting with mapState:** toProcess -> accumlate (Do not need to sort) toRectract > retract (Need to sort) The purpose of sorting in `retract` is to find expired data, such as for a ` 2 proceding` over window case, the dataState is (0 > row, 1-> row, 2-> row, 3-> row), When handling element `3 ` we need find the element` 0` and `retract` it. If the backend state is `RocksDB` and the key is Long type, then luckily the data is sorted by key ensured by RocksDB. But if we use other backend, it may not be sorted, such as (heapBackend). So the existing MapState key sorting has some limitations, unless we implement a `sortedMapState`. This semantics can ensure that any backend are ordered. So my current design uses traversal dataState.keys to find the smallest one for retraction. About register timer:** I would like to use the `timestamp` to register timer. For example for `(7L, 7, "Hello")` I use `7` to register timer. IMO, we should not use `currentWatermark + 1` to register timer, as the currentWatermark can not be representing the "event time" of each data. If we explicitly use `currentWatermark + 1`, there will be problems. For instance, two data with different event time could be arrival at same time. Then we will get the same register timer (because we have not updated currentWatermark). So eventually, these two data will different event time will be emitted at the same time, which is unexpected. FYI, examples (using my design) are as follows: (Refer to:[ `testBoundPartitionedEventTimeWindowWithRowWithLateEvent`]( https://github.com/sunjincheng121/flink/blob/FLINK-5990-PR/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala )) Sql: ` SELECT +c, a, sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW) from T1` TestData: ``` val data = List( (1L, 1, "Hello"), (2L, 2, "Hello"), (4L, 4, "Hello"), (3L, 3, "Hello"), (7L, 7, "Hello"), (8L, 8, "Hello World"), (7L, 8, "Hello"), (5L, 5, "Hello"), (20L, 20, "Hello World"), (9L, 9, "Hello World")) ``` WaterMark: ``` (1L, 1, "Hello") -> -1 (2L, 2, "Hello") -> 0 (4L, 4, "Hello") -> 2 (3L, 3, "Hello") -> 1 (7L, 7, "Hello") -> 5 (8L, 8, "Hello World") -> 6 (7L, 8, "Hello") -> 5 (5L, 5, "Hello") -> 3 (20L, 20, "Hello World") -> 18 (9L, 9, "Hello World")) -> 7 ``` Print info in `processElement ` and `onTimer`: ``` process_ts_lastTs_wm: [1] [0] [-9223372036854775808] process_ts_lastTs_wm: [2] [0] [-1] process_ts_lastTs_wm: [4] [0] [0] ====>ontime_ts_wm_triggerTs_lastRow: [1] [2] [null] [null] ====>ontime_ts_wm_triggerTs_lastRow: [2] [2] [1] [null] process_ts_lastTs_wm: [3] [2] [2] process_ts_lastTs_wm: [7] [2] [2] ====>ontime_ts_wm_triggerTs_lastRow: [3] [5] [2] [null] ====>ontime_ts_wm_triggerTs_lastRow: [4] [5] [3] [(1,Hello)] process_ts_lastTs_wm: [8] [0] [5] process_ts_lastTs_wm: [7] [4] [6] <<late_ts_lastTs_wm: [5] [4] [6] process_ts_lastTs_wm: [20] [0] [6] ====>ontime_ts_wm_triggerTs_lastRow: [7] [18] [4] [(2,Hello)] ====>ontime_ts_wm_triggerTs_lastRow: [7] [18] [4] [(3,Hello)] ====>ontime_ts_wm_triggerTs_lastRow: [8] [18] [null] [null] <<late_ts_lastTs_wm: [9] [8] [18] ====>ontime_ts_wm_triggerTs_lastRow: [20] [9223372036854775807] [8] [null] ``` BTW, I updated the code, had separated the proc-time and event-time in to two classes. I am trying to make a detailed description, but I do not know if it is clear enough, feel free to feedback me if there is anything not clear. Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107430676

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,207 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Int)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          + var accumulators = accumulatorState.value
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var dataCount = dataCountState.value + 1
          + dataCountState.update(dataCount)
          +
          + var lastExpiredRow: Row = null
          +
          + if (dataCount > precedingOffset) {
          + val dataTimestampIt = dataState.keys.iterator
          + var expiredDataTs: Long = Long.MaxValue
          + while (dataTimestampIt.hasNext) {
          + val dataTs = dataTimestampIt.next
          + if (dataTs < expiredDataTs)

          { + expiredDataTs = dataTs + }

          + }
          + val windowDataList = dataState.get(expiredDataTs)
          +
          + lastExpiredRow = windowDataList.get(0)
          +
          + windowDataList.remove(0)
          +
          + if (windowDataList.size > 0)

          { + dataState.put(expiredDataTs, windowDataList) + }

          else

          { + dataState.remove(expiredDataTs) + }

          + }
          +
          + var i = 0
          + while (i < forwardedFieldCount)

          { + output.setField(i, input.getField(i)) + i += 1 + }

          +
          + if (null != lastExpiredRow) {
          + i = 0
          + while (i < aggregates.length)

          { + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + val agg = aggregates(i) + aggregates(i).retract(accumulator, lastExpiredRow.getField(aggFields(i))) + i += 1 + }

          + }
          +
          + i = 0
          + while (i < aggregates.length)

          { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + }

          + accumulatorState.update(accumulators)
          — End diff –

          We only need to update the state once we leave the method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107430676 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Int) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + var accumulators = accumulatorState.value + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var dataCount = dataCountState.value + 1 + dataCountState.update(dataCount) + + var lastExpiredRow: Row = null + + if (dataCount > precedingOffset) { + val dataTimestampIt = dataState.keys.iterator + var expiredDataTs: Long = Long.MaxValue + while (dataTimestampIt.hasNext) { + val dataTs = dataTimestampIt.next + if (dataTs < expiredDataTs) { + expiredDataTs = dataTs + } + } + val windowDataList = dataState.get(expiredDataTs) + + lastExpiredRow = windowDataList.get(0) + + windowDataList.remove(0) + + if (windowDataList.size > 0) { + dataState.put(expiredDataTs, windowDataList) + } else { + dataState.remove(expiredDataTs) + } + } + + var i = 0 + while (i < forwardedFieldCount) { + output.setField(i, input.getField(i)) + i += 1 + } + + if (null != lastExpiredRow) { + i = 0 + while (i < aggregates.length) { + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + val agg = aggregates(i) + aggregates(i).retract(accumulator, lastExpiredRow.getField(aggFields(i))) + i += 1 + } + } + + i = 0 + while (i < aggregates.length) { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + } + accumulatorState.update(accumulators) — End diff – We only need to update the state once we leave the method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107429643

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,207 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Int)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          + var accumulators = accumulatorState.value
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var dataCount = dataCountState.value + 1
          + dataCountState.update(dataCount)
          +
          + var lastExpiredRow: Row = null
          +
          + if (dataCount > precedingOffset) {
          + val dataTimestampIt = dataState.keys.iterator
          + var expiredDataTs: Long = Long.MaxValue
          — End diff –

          would be good to buffer the `windowDataList` across iterations of the `while` loop until it is empty.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107429643 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Int) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + var accumulators = accumulatorState.value + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var dataCount = dataCountState.value + 1 + dataCountState.update(dataCount) + + var lastExpiredRow: Row = null + + if (dataCount > precedingOffset) { + val dataTimestampIt = dataState.keys.iterator + var expiredDataTs: Long = Long.MaxValue — End diff – would be good to buffer the `windowDataList` across iterations of the `while` loop until it is empty.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107433959

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,207 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Int)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          — End diff –

          `triggeringTs > lastTriggeringTs` will be sufficient, actually `triggeringTs >= laterTriggeringTs`. If `triggeringTs` is the last element that we emitted, we can still emit the row even if it is smaller than the current watermark.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107433959 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Int) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { — End diff – `triggeringTs > lastTriggeringTs` will be sufficient, actually `triggeringTs >= laterTriggeringTs`. If `triggeringTs` is the last element that we emitted, we can still emit the row even if it is smaller than the current watermark.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107432524

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,207 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Int)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else {
          + val data = new ArrayList[Row]
          + data.add(input)
          + dataState.put(triggeringTs, data)
          + // register event time timer
          + ctx.timerService.registerEventTimeTimer(triggeringTs)
          — End diff –

          In event-time timers are only triggered when a new watermark is received.

          Your implementation relies on the fact that the timers are called in the correct order (which I would hope to be true, but haven't checked).

          If we register a timer for `currentWatermark + 1`, only one timer will be registered and called so we have to process all records in that call. If we follow your approach, we have multiple timers which will be called one after the other (but at the same logical time when a watermark is received). In each call, we need to do similar work which might make this approach less efficient.

          For example, if we register only a single timer, we could sort the key fields of the MapState just once and split them into values to retract and to emit, where we have to go multiple time through the keys if we register multiple timers.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107432524 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Int) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList [Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) — End diff – In event-time timers are only triggered when a new watermark is received. Your implementation relies on the fact that the timers are called in the correct order (which I would hope to be true, but haven't checked). If we register a timer for `currentWatermark + 1`, only one timer will be registered and called so we have to process all records in that call. If we follow your approach, we have multiple timers which will be called one after the other (but at the same logical time when a watermark is received). In each call, we need to do similar work which might make this approach less efficient. For example, if we register only a single timer, we could sort the key fields of the MapState just once and split them into values to retract and to emit, where we have to go multiple time through the keys if we register multiple timers.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107429893

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,207 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Int)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          + var accumulators = accumulatorState.value
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var dataCount = dataCountState.value + 1
          + dataCountState.update(dataCount)
          +
          + var lastExpiredRow: Row = null
          +
          + if (dataCount > precedingOffset) {
          + val dataTimestampIt = dataState.keys.iterator
          + var expiredDataTs: Long = Long.MaxValue
          + while (dataTimestampIt.hasNext) {
          + val dataTs = dataTimestampIt.next
          + if (dataTs < expiredDataTs)

          { + expiredDataTs = dataTs + }

          + }
          + val windowDataList = dataState.get(expiredDataTs)
          — End diff –

          Rename to `retractList`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107429893 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Int) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + var accumulators = accumulatorState.value + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var dataCount = dataCountState.value + 1 + dataCountState.update(dataCount) + + var lastExpiredRow: Row = null + + if (dataCount > precedingOffset) { + val dataTimestampIt = dataState.keys.iterator + var expiredDataTs: Long = Long.MaxValue + while (dataTimestampIt.hasNext) { + val dataTs = dataTimestampIt.next + if (dataTs < expiredDataTs) { + expiredDataTs = dataTs + } + } + val windowDataList = dataState.get(expiredDataTs) — End diff – Rename to `retractList`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107425551

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,207 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Int)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          + var accumulators = accumulatorState.value
          — End diff –

          This and the initialization can be moved out of the `while` loop

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107425551 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Int) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + var accumulators = accumulatorState.value — End diff – This and the initialization can be moved out of the `while` loop
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107434657

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,207 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Int)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          + var accumulators = accumulatorState.value
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var dataCount = dataCountState.value + 1
          + dataCountState.update(dataCount)
          +
          + var lastExpiredRow: Row = null
          +
          + if (dataCount > precedingOffset) {
          + val dataTimestampIt = dataState.keys.iterator
          + var expiredDataTs: Long = Long.MaxValue
          — End diff –

          If we move to a single timer implementation, I would split the keys into two sorted lists of keys to accumulate and keys to retract. That way we have fast access to all records.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107434657 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Int) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + var accumulators = accumulatorState.value + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var dataCount = dataCountState.value + 1 + dataCountState.update(dataCount) + + var lastExpiredRow: Row = null + + if (dataCount > precedingOffset) { + val dataTimestampIt = dataState.keys.iterator + var expiredDataTs: Long = Long.MaxValue — End diff – If we move to a single timer implementation, I would split the keys into two sorted lists of keys to accumulate and keys to retract. That way we have fast access to all records.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3585

          Hi @fhueske, in our point of view, register timer should be correlated to the event time of the CURRENT data, but not currentWaterMark (which is correlated to the event time of the PREVIOUS datas) . Using the PREVIOUS event time (`currentWatermark + 1`) to register the timer for CURRENT data may cause some problems IMO. I do not have whole lots of counterexamples for now, just intuitively think this is not the correct way. One example I have is:

          Data line -> ... 5 7 4 3 2
          Watermark line -> ... 5 4 3 2 1

          In this case, assuming when watermark is `3`,we get `4` and `7`, then we use ` currentWatermark + 1` to register timer `timer(3+1)` for both `4` and `7`. When watermark goes to `4`,timer `timer(3+1)` be triggered (then `4` and `7` will be emitted). After this, data `5` comes (while the watermark is still `4`), then we register timer `timer(4+1)` for 5. This is incorrect, as we should always ensure data `5` is triggered before data `7`. I run a test case, and printed the results of [event time][currentWatermark] as below. I hope it can help to explain my point.
          Datas:
          ```
          (1L, 1, "Hello"),
          (2L, 2, "Hello"),
          (4L, 4, "Hello"),
          (3L, 3, "Hello"),
          (5L, 3, "Hello"),
          (7L, 7, "Hello"),
          (6L, 7, "Hello"),
          (8L, 8, "Hello World"),
          (7L, 8, "Hello"),
          (5L, 5, "Hello"),
          (20L, 20, "Hello World"),
          (9L, 9, "Hello World"))
          ```
          Event time of each data & Watermark: (newTS_currentWM[event time][currentWatermark])
          ```
          newTS_currentWM[1][-9223372036854775808]
          newTS_currentWM[2][-1]
          newTS_currentWM[4][0]
          newTS_currentWM[3][2]
          newTS_currentWM[5][2]
          newTS_currentWM[7][3] // a data with event time 7, it will be registered timer to (3+1)
          newTS_currentWM[6][5] // a data with event time 6, it will be registered timer to (5+1)
          newTS_currentWM[8][5]
          newTS_currentWM[7][6]
          newTS_currentWM[5][6]
          newTS_currentWM[20][6]
          newTS_currentWM[9][18]
          ```

          Another important reason of using allowedLateness to configure the watermark in my design is that, in our point of view, watermark is a semantics which is correlated to the stream lateness SLA. User exactly knows the tuple of (SLA, allowedLateness). For instance:

          ******************************************
          A system which always ensure the order:
          SLA(100%) = Watermark(eventTime)
          ******************************************
          For flink system process out of order data, and user have different watermarks indicating different SLA levels:
          (SLA, allowedLateness) = Watermark(eventTime - allowedLateness)
          (SLA(80%), 0) = Watermark(eventTime)
          (SLA(90%), 2) = Watermark(eventTime - 2)
          (SLA(95%), 5) = Watermark(eventTime - 5)

          We want to keep the interface to let user configure the allowedLateness(AssignerWithPeriodicWatermarks|AssignerWithPunctuatedWatermarks), thereby the correlated SLA.

          What do you think ?
          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3585 Hi @fhueske, in our point of view, register timer should be correlated to the event time of the CURRENT data, but not currentWaterMark (which is correlated to the event time of the PREVIOUS datas) . Using the PREVIOUS event time (`currentWatermark + 1`) to register the timer for CURRENT data may cause some problems IMO. I do not have whole lots of counterexamples for now, just intuitively think this is not the correct way. One example I have is: Data line -> ... 5 7 4 3 2 Watermark line -> ... 5 4 3 2 1 In this case, assuming when watermark is `3`,we get `4` and `7`, then we use ` currentWatermark + 1` to register timer `timer(3+1)` for both `4` and `7`. When watermark goes to `4`,timer `timer(3+1)` be triggered (then `4` and `7` will be emitted). After this, data `5` comes (while the watermark is still `4`), then we register timer `timer(4+1)` for 5. This is incorrect, as we should always ensure data `5` is triggered before data `7`. I run a test case, and printed the results of [event time] [currentWatermark] as below. I hope it can help to explain my point. Datas: ``` (1L, 1, "Hello"), (2L, 2, "Hello"), (4L, 4, "Hello"), (3L, 3, "Hello"), (5L, 3, "Hello"), (7L, 7, "Hello"), (6L, 7, "Hello"), (8L, 8, "Hello World"), (7L, 8, "Hello"), (5L, 5, "Hello"), (20L, 20, "Hello World"), (9L, 9, "Hello World")) ``` Event time of each data & Watermark: (newTS_currentWM [event time] [currentWatermark] ) ``` newTS_currentWM [1] [-9223372036854775808] newTS_currentWM [2] [-1] newTS_currentWM [4] [0] newTS_currentWM [3] [2] newTS_currentWM [5] [2] newTS_currentWM [7] [3] // a data with event time 7, it will be registered timer to (3+1) newTS_currentWM [6] [5] // a data with event time 6, it will be registered timer to (5+1) newTS_currentWM [8] [5] newTS_currentWM [7] [6] newTS_currentWM [5] [6] newTS_currentWM [20] [6] newTS_currentWM [9] [18] ``` Another important reason of using allowedLateness to configure the watermark in my design is that, in our point of view, watermark is a semantics which is correlated to the stream lateness SLA. User exactly knows the tuple of (SLA, allowedLateness). For instance: ****************************************** A system which always ensure the order: SLA(100%) = Watermark(eventTime) ****************************************** For flink system process out of order data, and user have different watermarks indicating different SLA levels: (SLA, allowedLateness) = Watermark(eventTime - allowedLateness) (SLA(80%), 0) = Watermark(eventTime) (SLA(90%), 2) = Watermark(eventTime - 2) (SLA(95%), 5) = Watermark(eventTime - 5) We want to keep the interface to let user configure the allowedLateness(AssignerWithPeriodicWatermarks|AssignerWithPunctuatedWatermarks), thereby the correlated SLA. What do you think ? Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3585

          I think you can do exactly the same with both approaches, but a single timer has the benefit of reduced overhead. If you have records `(id, ts)`

          ```
          (1, 1), (2, 2), (3, 5), WM 4, (4, 7), (5, 10), (6, 6), WM 8
          ```

          Using timestamp timers would result in
          ```
          processElement((1,1))
          processElement((2,2))
          processElement((3,5))
          onTimer(1)
          onTimer(2)
          processElement((4,7))
          processElement((5,10))
          processElement((6,6))
          onTimer(5)
          onTimer(6)
          onTimer(7)
          ```

          Where `onTimer(1)` and `onTimer(2)` (or `onTimer(5)`, `onTimer(6)`, and `onTimer(7)`) could share the access to the `MapState`

          Using a single watermark timer we would have
          ```
          processElement((1,1))
          processElement((2,2))
          processElement((3,5))
          onTimer(_) // emit all records with ts < current watermark (= 4): (1,1) and (2,2)
          processElement((4,7))
          processElement((5,10))
          processElement((6,6))
          onTimer(_) // emit all records with ts < current watermark (= 8): (3,5), (6,6), (4,7)
          ```

          Since we can check against the current watermark also in this case, we can avoid to emit records too early. It should also be possible integrate an allowed lateness parameter into this approach in the future.
          The benefit of processing multiple rows (for different timestamps) by a single `onTimer()` call is that we can iterate the list of `Long` keys just once.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3585 I think you can do exactly the same with both approaches, but a single timer has the benefit of reduced overhead. If you have records `(id, ts)` ``` (1, 1), (2, 2), (3, 5), WM 4, (4, 7), (5, 10), (6, 6), WM 8 ``` Using timestamp timers would result in ``` processElement((1,1)) processElement((2,2)) processElement((3,5)) onTimer(1) onTimer(2) processElement((4,7)) processElement((5,10)) processElement((6,6)) onTimer(5) onTimer(6) onTimer(7) ``` Where `onTimer(1)` and `onTimer(2)` (or `onTimer(5)`, `onTimer(6)`, and `onTimer(7)`) could share the access to the `MapState` Using a single watermark timer we would have ``` processElement((1,1)) processElement((2,2)) processElement((3,5)) onTimer(_) // emit all records with ts < current watermark (= 4): (1,1) and (2,2) processElement((4,7)) processElement((5,10)) processElement((6,6)) onTimer(_) // emit all records with ts < current watermark (= 8): (3,5), (6,6), (4,7) ``` Since we can check against the current watermark also in this case, we can avoid to emit records too early. It should also be possible integrate an allowed lateness parameter into this approach in the future. The benefit of processing multiple rows (for different timestamps) by a single `onTimer()` call is that we can iterate the list of `Long` keys just once.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3585

          Hi @fhueske I had updated the PR. and move the `processing time OVER ROWS BETWEEN x PRECEDING` part(RowsClauseProcessingBoundedOverProcessFunction) to [Here](https://github.com/sunjincheng121/flink/blob/FLINK-5990-ForFLINK-5653/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseProcessingBoundedOverProcessFunction.scala)
          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3585 Hi @fhueske I had updated the PR. and move the `processing time OVER ROWS BETWEEN x PRECEDING` part(RowsClauseProcessingBoundedOverProcessFunction) to [Here] ( https://github.com/sunjincheng121/flink/blob/FLINK-5990-ForFLINK-5653/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseProcessingBoundedOverProcessFunction.scala ) Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107649062

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -317,4 +397,24 @@ class SqlITCase extends StreamingWithStateTestBase

          { result.addSink(new StreamITCase.StringSink) env.execute() }

          +
          +}
          +
          +object SqlITCase {
          +
          + class TimestampWithLatenessWatermark(allowedLateness: Long)
          — End diff –

          @sunjincheng121 can you please consider changing the allowedLateness to offset:
          `class AssignerWithWatermarkOffset(watermarkOffset: Long)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107649062 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -317,4 +397,24 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + +} + +object SqlITCase { + + class TimestampWithLatenessWatermark(allowedLateness: Long) — End diff – @sunjincheng121 can you please consider changing the allowedLateness to offset: `class AssignerWithWatermarkOffset(watermarkOffset: Long)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107693914

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,206 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs)) {
          — End diff –

          we can directly call `get` and check for `null` instead of calling `contains` first. This will save us one call to the state backend.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107693914 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { — End diff – we can directly call `get` and check for `null` instead of calling `contains` first. This will save us one call to the state backend.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107674994

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,206 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration) {
          +
          + output = new Row(forwardedFieldCount + aggregates.length)
          +
          +
          + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
          + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
          + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
          +
          + val dataCountStateDescriptor =
          + new ValueStateDescriptor[Long]("dataCountState", classOf[Long])
          + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor)
          +
          + val accumulatorStateDescriptor =
          + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType)
          + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor)
          +
          + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
          + new MapStateDescriptor[Long, JList[Row]](
          + "dataState",
          + classOf[Long],
          + classOf[JList[Row]])
          — End diff –

          We should use the correct RowTypeInfo of the input type here. Otherwise, we'll have a less efficient GenericType and serialize with Kryo.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107674994 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor [Long] = + new ValueStateDescriptor [Long] ("lastTriggeringTsState", classOf [Long] ) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor [Long] ("dataCountState", classOf [Long] ) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor [Row] ("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList [Row] ] = + new MapStateDescriptor[Long, JList [Row] ]( + "dataState", + classOf [Long] , + classOf[JList [Row] ]) — End diff – We should use the correct RowTypeInfo of the input type here. Otherwise, we'll have a less efficient GenericType and serialize with Kryo.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107668380

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -99,28 +100,64 @@ class DataStreamOverAggregate(
          .getFieldList
          .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
          .getValue
          -
          timeType match {
          case _: ProcTimeType =>

          • // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
          • if (overWindow.lowerBound.isUnbounded &&
          • overWindow.upperBound.isCurrentRow) {
            + // proc-time OVER window
            + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + }

            else if (
            + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
            + overWindow.upperBound.isCurrentRow) {
            + // bounded OVER window
            + if (overWindow.isRows) {
            + // ROWS clause bounded OVER window
            + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS)

              • End diff –

          I think this should be removed as the PR addresses event time.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107668380 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -99,28 +100,64 @@ class DataStreamOverAggregate( .getFieldList .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex) .getValue - timeType match { case _: ProcTimeType => // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition. if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // proc-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + } else if ( + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) End diff – I think this should be removed as the PR addresses event time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107700969

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,206 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var accumulators = accumulatorState.value
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var dataCount = dataCountState.value + 1
          + dataCountState.update(dataCount)
          +
          + var lastExpiredRow: Row = null
          +
          + if (dataCount > precedingOffset) {
          — End diff –

          We should move this out of the `while` loop. That way we will reuse the `windowDataList` (or better `retractList`?) across iterations. We should only get a new `retractList` if the previous became empty.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107700969 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var accumulators = accumulatorState.value + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var dataCount = dataCountState.value + 1 + dataCountState.update(dataCount) + + var lastExpiredRow: Row = null + + if (dataCount > precedingOffset) { — End diff – We should move this out of the `while` loop. That way we will reuse the `windowDataList` (or better `retractList`?) across iterations. We should only get a new `retractList` if the previous became empty.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107707653

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -317,4 +397,24 @@ class SqlITCase extends StreamingWithStateTestBase

          { result.addSink(new StreamITCase.StringSink) env.execute() }

          +
          +}
          +
          +object SqlITCase {
          +
          + class TimestampWithLatenessWatermark(allowedLateness: Long)
          — End diff –

          I don't think we need this timestamp extractor. IMO, the offset should be integrated into the window operator itself and not rely on modified watermarks.
          I'd suggest to first add this OVER window operator without offset support and add this later when the other OVER window operators have been added.

          What do you think, @shaoxuan-wang and @sunjincheng121 ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107707653 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -317,4 +397,24 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + +} + +object SqlITCase { + + class TimestampWithLatenessWatermark(allowedLateness: Long) — End diff – I don't think we need this timestamp extractor. IMO, the offset should be integrated into the window operator itself and not rely on modified watermarks. I'd suggest to first add this OVER window operator without offset support and add this later when the other OVER window operators have been added. What do you think, @shaoxuan-wang and @sunjincheng121 ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107704783

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -293,6 +297,82 @@ class SqlITCase extends StreamingWithStateTestBase

          { assertEquals(expected.sorted, StreamITCase.testResults.sorted) }

          + @Test
          + def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
          + val env = StreamExecutionEnvironment.getExecutionEnvironment
          + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          + env.setStateBackend(getStateBackend)
          + val tEnv = TableEnvironment.getTableEnvironment(env)
          + StreamITCase.clear
          +
          + val t1 = env.fromCollection(data)
          + .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(0))
          + .toTable(tEnv).as('a, 'b, 'c)
          +
          + tEnv.registerTable("T1", t1)
          +
          + val sqlQuery = "SELECT " +
          + "c, a, " +
          + "sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" +
          — End diff –

          please add a count aggregation

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107704783 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -293,6 +297,82 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testBoundPartitionedEventTimeWindowWithRow(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t1 = env.fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(0)) + .toTable(tEnv).as('a, 'b, 'c) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT " + + "c, a, " + + "sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" + — End diff – please add a count aggregation
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107676395

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,206 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration) {
          +
          + output = new Row(forwardedFieldCount + aggregates.length)
          +
          +
          + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
          + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long])
          + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor)
          +
          + val dataCountStateDescriptor =
          + new ValueStateDescriptor[Long]("dataCountState", classOf[Long])
          + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor)
          +
          + val accumulatorStateDescriptor =
          + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType)
          + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor)
          +
          + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
          + new MapStateDescriptor[Long, JList[Row]](
          + "dataState",
          + classOf[Long],
          + classOf[JList[Row]])
          — End diff –

          So this should be `new ListTypeInfo[Row](inputRowType)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107676395 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor [Long] = + new ValueStateDescriptor [Long] ("lastTriggeringTsState", classOf [Long] ) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor [Long] ("dataCountState", classOf [Long] ) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor [Row] ("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList [Row] ] = + new MapStateDescriptor[Long, JList [Row] ]( + "dataState", + classOf [Long] , + classOf[JList [Row] ]) — End diff – So this should be `new ListTypeInfo [Row] (inputRowType)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107668755

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -99,28 +100,64 @@ class DataStreamOverAggregate(
          .getFieldList
          .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
          .getValue
          -
          timeType match {
          case _: ProcTimeType =>

          • // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
          • if (overWindow.lowerBound.isUnbounded &&
          • overWindow.upperBound.isCurrentRow) {
            + // proc-time OVER window
            + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + }

            else if (
            + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
            + overWindow.upperBound.isCurrentRow)

            Unknown macro: { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) + } else { + // RANGE clause bounded OVER window + throw new TableException( + "RANGE clause bounded proc-time OVER window no supported yet.") + } }

            else

            { throw new TableException( - "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + - "condition.") + "proc-time OVER window only support CURRENT ROW condition.") }

            case _: RowTimeType =>

          • throw new TableException("OVER Window of the EventTime type is not currently supported.")
            + // row-time OVER window
            + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
            + // non-bounded OVER window
            + if (overWindow.isRows) {
              • End diff –

          I don't think we need the rows / range distinction for unbounded-currentRow windows.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107668755 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -99,28 +100,64 @@ class DataStreamOverAggregate( .getFieldList .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex) .getValue - timeType match { case _: ProcTimeType => // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition. if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // proc-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + } else if ( + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) Unknown macro: { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) + } else { + // RANGE clause bounded OVER window + throw new TableException( + "RANGE clause bounded proc-time OVER window no supported yet.") + } } else { throw new TableException( - "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + - "condition.") + "proc-time OVER window only support CURRENT ROW condition.") } case _: RowTimeType => throw new TableException("OVER Window of the EventTime type is not currently supported.") + // row-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window + if (overWindow.isRows) { End diff – I don't think we need the rows / range distinction for unbounded-currentRow windows.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107667958

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -99,28 +100,64 @@ class DataStreamOverAggregate(
          .getFieldList
          .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
          .getValue
          -
          timeType match {
          case _: ProcTimeType =>

          • // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
          • if (overWindow.lowerBound.isUnbounded &&
          • overWindow.upperBound.isCurrentRow) {
            + // proc-time OVER window
            + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + }

            else if (
            + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
            + overWindow.upperBound.isCurrentRow)

            Unknown macro: { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) + } else { + // RANGE clause bounded OVER window + throw new TableException( + "RANGE clause bounded proc-time OVER window no supported yet.") + } }

            else {
            throw new TableException(

          • "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " +
          • "condition.")
            + "proc-time OVER window only support CURRENT ROW condition.")
              • End diff –

          revert

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107667958 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -99,28 +100,64 @@ class DataStreamOverAggregate( .getFieldList .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex) .getValue - timeType match { case _: ProcTimeType => // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition. if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // proc-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + } else if ( + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) Unknown macro: { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) + } else { + // RANGE clause bounded OVER window + throw new TableException( + "RANGE clause bounded proc-time OVER window no supported yet.") + } } else { throw new TableException( "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + "condition.") + "proc-time OVER window only support CURRENT ROW condition.") End diff – revert
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107697236

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,206 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var accumulators = accumulatorState.value
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var dataCount = dataCountState.value + 1
          — End diff –

          Move this to an else branch of the `(dataCount > precedingOffset)` condition

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107697236 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var accumulators = accumulatorState.value + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var dataCount = dataCountState.value + 1 — End diff – Move this to an else branch of the `(dataCount > precedingOffset)` condition
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107701187

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,206 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var accumulators = accumulatorState.value
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var dataCount = dataCountState.value + 1
          + dataCountState.update(dataCount)
          +
          + var lastExpiredRow: Row = null
          +
          + if (dataCount > precedingOffset) {
          — End diff –

          This logic could be moved into a private function

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107701187 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var accumulators = accumulatorState.value + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var dataCount = dataCountState.value + 1 + dataCountState.update(dataCount) + + var lastExpiredRow: Row = null + + if (dataCount > precedingOffset) { — End diff – This logic could be moved into a private function
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107697116

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,206 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var accumulators = accumulatorState.value
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var dataCount = dataCountState.value + 1
          + dataCountState.update(dataCount)
          — End diff –

          update once at the end of the method.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107697116 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var accumulators = accumulatorState.value + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var dataCount = dataCountState.value + 1 + dataCountState.update(dataCount) — End diff – update once at the end of the method.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107694582

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,207 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Int)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else {
          + val data = new ArrayList[Row]
          + data.add(input)
          + dataState.put(triggeringTs, data)
          + // register event time timer
          + ctx.timerService.registerEventTimeTimer(triggeringTs)
          — End diff –

          We had an offline chat about this and I agree that registering a timer for each timestamp is easier to reason about. Also, it will buffer all rows which are received with the same timestamp.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107694582 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Int) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList [Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) — End diff – We had an offline chat about this and I agree that registering a timer for each timestamp is easier to reason about. Also, it will buffer all rows which are received with the same timestamp.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107670323

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -130,32 +167,72 @@ class DataStreamOverAggregate(
          val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]

          val result: DataStream[Row] =

          • // partitioned aggregation
          • if (partitionKeys.nonEmpty) {
          • val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
          • namedAggregates,
          • inputType)
            + // partitioned aggregation
            + if (partitionKeys.nonEmpty) { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS + inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] - }
          • // non-partitioned aggregation
          • else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( - namedAggregates, - inputType, - false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) - .name(aggOpName) - .asInstanceOf[DataStream[Row]] - }

            + }
            + // non-partitioned aggregation
            + else

            { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + + inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + }

            + result
            + }
            +
            + def createRowsClauseBoundedAndCurrentRowOverWindow(
            + inputDS: DataStream[Row],
            + isRowTimeType: Boolean = false): DataStream[Row] = {
            +
            + val overWindow: Group = logicWindow.groups.get(0)
            + val partitionKeys: Array[Int] = overWindow.keys.toArray
            + val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates
            + val inputFields = (0 until inputType.getFieldCount).toArray
            +
            + val precedingOffset =
            + getLowerBoundary(logicWindow, overWindow, getInput()) + 1
            +
            + // get the output types
            + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
            +
            + val result: DataStream[Row] =
            + // partitioned aggregation
            + if (partitionKeys.nonEmpty)

            { + val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction( + namedAggregates, + inputType, + inputFields, + precedingOffset, + isRowTimeType + ) + inputDS + .keyBy(partitionKeys: _*) + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + }

            + // non-partitioned aggregation
            + else {
            + throw TableException(

              • End diff –

          Isn't the non-partitioned case analogous if we use `NullByteKeyExtractor`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107670323 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -130,32 +167,72 @@ class DataStreamOverAggregate( val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf [RowTypeInfo] val result: DataStream [Row] = // partitioned aggregation if (partitionKeys.nonEmpty) { val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( namedAggregates, inputType) + // partitioned aggregation + if (partitionKeys.nonEmpty) { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS + inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] - } // non-partitioned aggregation else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( - namedAggregates, - inputType, - false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) - .returns(rowTypeInfo) - .name(aggOpName) - .asInstanceOf[DataStream[Row]] - } + } + // non-partitioned aggregation + else { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + + inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } + result + } + + def createRowsClauseBoundedAndCurrentRowOverWindow( + inputDS: DataStream [Row] , + isRowTimeType: Boolean = false): DataStream [Row] = { + + val overWindow: Group = logicWindow.groups.get(0) + val partitionKeys: Array [Int] = overWindow.keys.toArray + val namedAggregates: Seq[CalcitePair [AggregateCall, String] ] = generateNamedAggregates + val inputFields = (0 until inputType.getFieldCount).toArray + + val precedingOffset = + getLowerBoundary(logicWindow, overWindow, getInput()) + 1 + + // get the output types + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf [RowTypeInfo] + + val result: DataStream [Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { + val processFunction = AggregateUtil.createRowsClauseBoundedOverProcessFunction( + namedAggregates, + inputType, + inputFields, + precedingOffset, + isRowTimeType + ) + inputDS + .keyBy(partitionKeys: _*) + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } + // non-partitioned aggregation + else { + throw TableException( End diff – Isn't the non-partitioned case analogous if we use `NullByteKeyExtractor`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107707074

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -293,6 +297,82 @@ class SqlITCase extends StreamingWithStateTestBase

          { assertEquals(expected.sorted, StreamITCase.testResults.sorted) }

          + @Test
          + def testBoundPartitionedEventTimeWindowWithRow(): Unit =

          { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t1 = env.fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(0)) + .toTable(tEnv).as('a, 'b, 'c) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT " + + "c, a, " + + "sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" + + "from T1" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,9", "Hello,5,12", + "Hello,6,15", "Hello World,7,7", "Hello World,8,15", "Hello World,20,35") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + }

          +
          + @Test
          + def testBoundPartitionedEventTimeWindowWithRowWithLateEvent(): Unit = {
          +
          + val data = List(
          + (1L, 1, "Hello"),
          + (2L, 2, "Hello"),
          + (4L, 4, "Hello"),
          + (3L, 3, "Hello"),
          + (7L, 7, "Hello"),
          + (8L, 8, "Hello World"),
          + (7L, 8, "Hello"),
          + (5L, 5, "Hello"),
          + (20L, 20, "Hello World"),
          + (9L, 9, "Hello World"))
          +
          + val env = StreamExecutionEnvironment.getExecutionEnvironment
          + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          + env.setStateBackend(getStateBackend)
          + StreamITCase.clear
          +
          + // set the parallelism to 1 such that the test elements are arrived in order. For instance,
          + // element (20L, 20, "Hello World") arrives before element (9L, 9, "Hello World").
          + env.setParallelism(1)
          +
          + val tEnv = TableEnvironment.getTableEnvironment(env)
          + StreamITCase.testResults = mutable.MutableList()
          +
          + val t1 = env.fromCollection(data)
          + .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(2)) // allowedLateness = 2
          — End diff –

          I don't think we need to test this case. The complete logic of the over window is the same, just the input is different but very similar as before.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107707074 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -293,6 +297,82 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testBoundPartitionedEventTimeWindowWithRow(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t1 = env.fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(0)) + .toTable(tEnv).as('a, 'b, 'c) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT " + + "c, a, " + + "sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" + + "from T1" + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = mutable.MutableList( + "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,9", "Hello,5,12", + "Hello,6,15", "Hello World,7,7", "Hello World,8,15", "Hello World,20,35") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testBoundPartitionedEventTimeWindowWithRowWithLateEvent(): Unit = { + + val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (4L, 4, "Hello"), + (3L, 3, "Hello"), + (7L, 7, "Hello"), + (8L, 8, "Hello World"), + (7L, 8, "Hello"), + (5L, 5, "Hello"), + (20L, 20, "Hello World"), + (9L, 9, "Hello World")) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.clear + + // set the parallelism to 1 such that the test elements are arrived in order. For instance, + // element (20L, 20, "Hello World") arrives before element (9L, 9, "Hello World"). + env.setParallelism(1) + + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val t1 = env.fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(2)) // allowedLateness = 2 — End diff – I don't think we need to test this case. The complete logic of the over window is the same, just the input is different but very similar as before.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107701598

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,206 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          + if (dataState.contains(triggeringTs))

          { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          + if (null != inputs) {
          + var accumulators = accumulatorState.value
          + var j: Int = 0
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var dataCount = dataCountState.value + 1
          + dataCountState.update(dataCount)
          +
          + var lastExpiredRow: Row = null
          +
          + if (dataCount > precedingOffset) {
          + val dataTimestampIt = dataState.keys.iterator
          + var expiredDataTs: Long = Long.MaxValue
          + while (dataTimestampIt.hasNext) {
          + val dataTs = dataTimestampIt.next
          + if (dataTs < expiredDataTs)

          { + expiredDataTs = dataTs + }

          + }
          + val windowDataList = dataState.get(expiredDataTs)
          +
          + lastExpiredRow = windowDataList.get(0)
          +
          + windowDataList.remove(0)
          +
          + if (windowDataList.size > 0) {
          + dataState.put(expiredDataTs, windowDataList)
          — End diff –

          We should only update the state (MapState.put) at the end of the method or remove if the `retractList` became empty.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107701598 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { + if (dataState.contains(triggeringTs)) { + val data = dataState.get(triggeringTs) + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + if (null != inputs) { + var accumulators = accumulatorState.value + var j: Int = 0 + while (j < inputs.size) { + val input = inputs.get(j) + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var dataCount = dataCountState.value + 1 + dataCountState.update(dataCount) + + var lastExpiredRow: Row = null + + if (dataCount > precedingOffset) { + val dataTimestampIt = dataState.keys.iterator + var expiredDataTs: Long = Long.MaxValue + while (dataTimestampIt.hasNext) { + val dataTs = dataTimestampIt.next + if (dataTs < expiredDataTs) { + expiredDataTs = dataTs + } + } + val windowDataList = dataState.get(expiredDataTs) + + lastExpiredRow = windowDataList.get(0) + + windowDataList.remove(0) + + if (windowDataList.size > 0) { + dataState.put(expiredDataTs, windowDataList) — End diff – We should only update the state (MapState.put) at the end of the method or remove if the `retractList` became empty.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107672464

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -91,6 +91,55 @@ object AggregateUtil {
          }

          /**
          + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] for ROWS clause
          + * bounded OVER window to evaluate final aggregate value.
          + *
          + * @param namedAggregates List of calls to aggregate functions and their output field names
          + * @param inputType Input row type
          + * @param inputFields All input fields
          + * @param precedingOffset the preceding offset
          + * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType
          + * @param isPartitioned It is a tag that indicates whether the data has partitioned
          + * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
          + */
          + private[flink] def createRowsClauseBoundedOverProcessFunction(
          + namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          + inputType: RelDataType,
          + inputFields: Array[Int],
          + precedingOffset: Long,
          + isRowTimeType: Boolean,
          + isPartitioned: Boolean = true): ProcessFunction[Row, Row] = {
          +
          + val (aggFields, aggregates) =
          + transformToAggregateFunctions(
          + namedAggregates.map(_.getKey),
          + inputType,
          + needRetraction = true)
          +
          + val aggregationStateType: RowTypeInfo =
          + createDataSetAggregateBufferDataType(Array(), aggregates, inputType)
          +
          + if (isPartitioned) {
          — End diff –

          Do we need to distinguish this case?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107672464 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -91,6 +91,55 @@ object AggregateUtil { } /** + * Create an [ [org.apache.flink.streaming.api.functions.ProcessFunction] ] for ROWS clause + * bounded OVER window to evaluate final aggregate value. + * + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param inputType Input row type + * @param inputFields All input fields + * @param precedingOffset the preceding offset + * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType + * @param isPartitioned It is a tag that indicates whether the data has partitioned + * @return [ [org.apache.flink.streaming.api.functions.ProcessFunction] ] + */ + private [flink] def createRowsClauseBoundedOverProcessFunction( + namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + inputType: RelDataType, + inputFields: Array [Int] , + precedingOffset: Long, + isRowTimeType: Boolean, + isPartitioned: Boolean = true): ProcessFunction [Row, Row] = { + + val (aggFields, aggregates) = + transformToAggregateFunctions( + namedAggregates.map(_.getKey), + inputType, + needRetraction = true) + + val aggregationStateType: RowTypeInfo = + createDataSetAggregateBufferDataType(Array(), aggregates, inputType) + + if (isPartitioned) { — End diff – Do we need to distinguish this case?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107706443

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -293,6 +297,82 @@ class SqlITCase extends StreamingWithStateTestBase

          { assertEquals(expected.sorted, StreamITCase.testResults.sorted) }

          + @Test
          + def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
          + val env = StreamExecutionEnvironment.getExecutionEnvironment
          + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
          + env.setStateBackend(getStateBackend)
          + val tEnv = TableEnvironment.getTableEnvironment(env)
          + StreamITCase.clear
          +
          + val t1 = env.fromCollection(data)
          + .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(0))
          — End diff –

          I would suggest to implement a utility `SourceFunction` similar to the ones which are defined inline in PR #3386.
          The `SourceFunction` could have a `Seq[Either[(Long, Tuple), Long]]` as input, i.e., either tuples with timestamp or a watermark timestamp.

          This would be useful for many more event-time tests.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107706443 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -293,6 +297,82 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testBoundPartitionedEventTimeWindowWithRow(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t1 = env.fromCollection(data) + .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(0)) — End diff – I would suggest to implement a utility `SourceFunction` similar to the ones which are defined inline in PR #3386. The `SourceFunction` could have a `Seq[Either [(Long, Tuple), Long] ]` as input, i.e., either tuples with timestamp or a watermark timestamp. This would be useful for many more event-time tests.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107677719

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,207 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val precedingOffset: Int)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) {
          — End diff –

          `lastTriggeringTsState` is always smaller than the current watermark (otherwise the timer would not have been triggered).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107677719 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val precedingOffset: Int) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + classOf[Long], + classOf[JList[Row]]) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + if (triggeringTs > lastTriggeringTs && triggeringTs > ctx.timerService.currentWatermark) { — End diff – `lastTriggeringTsState` is always smaller than the current watermark (otherwise the timer would not have been triggered).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107842352

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -99,28 +100,64 @@ class DataStreamOverAggregate(
          .getFieldList
          .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
          .getValue
          -
          timeType match {
          case _: ProcTimeType =>

          • // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
          • if (overWindow.lowerBound.isUnbounded &&
          • overWindow.upperBound.isCurrentRow) {
            + // proc-time OVER window
            + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + }

            else if (
            + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
            + overWindow.upperBound.isCurrentRow)

            Unknown macro: { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) + } else { + // RANGE clause bounded OVER window + throw new TableException( + "RANGE clause bounded proc-time OVER window no supported yet.") + } }

            else

            { throw new TableException( - "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + - "condition.") + "proc-time OVER window only support CURRENT ROW condition.") }

            case _: RowTimeType =>

          • throw new TableException("OVER Window of the EventTime type is not currently supported.")
            + // row-time OVER window
            + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
            + // non-bounded OVER window
            + if (overWindow.isRows) {
              • End diff –

          No, row-time OVER rows / range have different semantics. I think this does not affect the current JIRA. feature. Can I explain it in FLINK-5658 later today?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107842352 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -99,28 +100,64 @@ class DataStreamOverAggregate( .getFieldList .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex) .getValue - timeType match { case _: ProcTimeType => // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition. if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // proc-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + } else if ( + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) Unknown macro: { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) + } else { + // RANGE clause bounded OVER window + throw new TableException( + "RANGE clause bounded proc-time OVER window no supported yet.") + } } else { throw new TableException( - "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + - "condition.") + "proc-time OVER window only support CURRENT ROW condition.") } case _: RowTimeType => throw new TableException("OVER Window of the EventTime type is not currently supported.") + // row-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window + if (overWindow.isRows) { End diff – No, row-time OVER rows / range have different semantics. I think this does not affect the current JIRA. feature. Can I explain it in FLINK-5658 later today?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3585

          Hi, @fhueske thanks a lot for your suggestions. And most of the comments make sense for me. I had updated the PR. There is one thing I'am not sure. About implementation of the `SourceFunction`. So please pay attention on it. Feel free to feedback me If it's incorrect.

          Thanks,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3585 Hi, @fhueske thanks a lot for your suggestions. And most of the comments make sense for me. I had updated the PR. There is one thing I'am not sure. About implementation of the `SourceFunction`. So please pay attention on it. Feel free to feedback me If it's incorrect. Thanks, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107849107

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -99,28 +100,64 @@ class DataStreamOverAggregate(
          .getFieldList
          .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex)
          .getValue
          -
          timeType match {
          case _: ProcTimeType =>

          • // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition.
          • if (overWindow.lowerBound.isUnbounded &&
          • overWindow.upperBound.isCurrentRow) {
            + // proc-time OVER window
            + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + }

            else if (
            + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded &&
            + overWindow.upperBound.isCurrentRow)

            Unknown macro: { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) + } else { + // RANGE clause bounded OVER window + throw new TableException( + "RANGE clause bounded proc-time OVER window no supported yet.") + } }

            else

            { throw new TableException( - "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + - "condition.") + "proc-time OVER window only support CURRENT ROW condition.") }

            case _: RowTimeType =>

          • throw new TableException("OVER Window of the EventTime type is not currently supported.")
            + // row-time OVER window
            + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) {
            + // non-bounded OVER window
            + if (overWindow.isRows) {
              • End diff –

          You're right, it is not directly related to this PR. Let's discuss this in FLINK-5658. Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107849107 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -99,28 +100,64 @@ class DataStreamOverAggregate( .getFieldList .get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex) .getValue - timeType match { case _: ProcTimeType => // both ROWS and RANGE clause with UNBOUNDED PRECEDING and CURRENT ROW condition. if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // proc-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) + } else if ( + overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) Unknown macro: { + // bounded OVER window + if (overWindow.isRows) { + // ROWS clause bounded OVER window + createRowsClauseBoundedAndCurrentRowOverWindow(inputDS) + } else { + // RANGE clause bounded OVER window + throw new TableException( + "RANGE clause bounded proc-time OVER window no supported yet.") + } } else { throw new TableException( - "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + - "condition.") + "proc-time OVER window only support CURRENT ROW condition.") } case _: RowTimeType => throw new TableException("OVER Window of the EventTime type is not currently supported.") + // row-time OVER window + if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + // non-bounded OVER window + if (overWindow.isRows) { End diff – You're right, it is not directly related to this PR. Let's discuss this in FLINK-5658 . Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107880632

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -293,6 +297,107 @@ class SqlITCase extends StreamingWithStateTestBase

          { assertEquals(expected.sorted, StreamITCase.testResults.sorted) }

          + @Test
          + def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
          + val data = Seq(
          — End diff –

          I think the test data should be a bit more diverse. Multiple records per timestamp and key, out-of-order events, multiple timestamps per watermark.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107880632 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -293,6 +297,107 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testBoundPartitionedEventTimeWindowWithRow(): Unit = { + val data = Seq( — End diff – I think the test data should be a bit more diverse. Multiple records per timestamp and key, out-of-order events, multiple timestamps per watermark.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107879890

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala —
          @@ -317,4 +422,24 @@ class SqlITCase extends StreamingWithStateTestBase

          { result.addSink(new StreamITCase.StringSink) env.execute() }

          +
          +}
          +
          +object SqlITCase {
          +
          + class EventTimeSourceFunction[T](
          + dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
          + override def run(ctx: SourceContext[T]): Unit = {
          + dataWithTimestampList.foreach { x =>
          — End diff –

          can be simplified to
          ```
          dataWithTimestampList.foreach

          { case Left(t) => ctx.collectWithTimestamp(t._2, t._1) case Right(w) => ctx.emitWatermark(new Watermark(w)) }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107879890 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala — @@ -317,4 +422,24 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + +} + +object SqlITCase { + + class EventTimeSourceFunction [T] ( + dataWithTimestampList: Seq[Either [(Long, T), Long] ]) extends SourceFunction [T] { + override def run(ctx: SourceContext [T] ): Unit = { + dataWithTimestampList.foreach { x => — End diff – can be simplified to ``` dataWithTimestampList.foreach { case Left(t) => ctx.collectWithTimestamp(t._2, t._1) case Right(w) => ctx.emitWatermark(new Watermark(w)) } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107866801

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -130,32 +169,76 @@ class DataStreamOverAggregate(
          val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]

          val result: DataStream[Row] =

          • // partitioned aggregation
          • if (partitionKeys.nonEmpty) {
          • val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
          • namedAggregates,
          • inputType)
            + // partitioned aggregation
            + if (partitionKeys.nonEmpty) { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS + inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] - }
          • // non-partitioned aggregation
          • else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( - namedAggregates, - inputType, - false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) + }

            + // non-partitioned aggregation
            + else

            { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + + inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + }

            + result
            + }
            +
            + def createRowsClauseBoundedAndCurrentRowOverWindow(
            + inputDS: DataStream[Row],
            + isRowTimeType: Boolean = false): DataStream[Row] =

            Unknown macro: { + + val overWindow}

            + // non-partitioned aggregation
            + else {
            + inputDS
            + .keyBy(new NullByteKeySelector[Row])
            + .process(processFunction)

              • End diff –

          `setParallelism()` and `setMaxParallelism()`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107866801 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -130,32 +169,76 @@ class DataStreamOverAggregate( val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf [RowTypeInfo] val result: DataStream [Row] = // partitioned aggregation if (partitionKeys.nonEmpty) { val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( namedAggregates, inputType) + // partitioned aggregation + if (partitionKeys.nonEmpty) { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS + inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] - } // non-partitioned aggregation else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( - namedAggregates, - inputType, - false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) + } + // non-partitioned aggregation + else { + val processFunction = AggregateUtil.createUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + + inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } + result + } + + def createRowsClauseBoundedAndCurrentRowOverWindow( + inputDS: DataStream [Row] , + isRowTimeType: Boolean = false): DataStream [Row] = Unknown macro: { + + val overWindow} + // non-partitioned aggregation + else { + inputDS + .keyBy(new NullByteKeySelector [Row] ) + .process(processFunction) End diff – `setParallelism()` and `setMaxParallelism()`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3585#discussion_r107877027

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala —
          @@ -0,0 +1,230 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.util.

          {ArrayList, List => JList}

          +
          +import org.apache.flink.api.common.state._
          +import org.apache.flink.api.common.typeinfo.

          {BasicTypeInfo, TypeInformation}

          +import org.apache.flink.api.java.typeutils.

          {ListTypeInfo, RowTypeInfo}

          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function for ROWS clause event-time bounded OVER window
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + * @param forwardedFieldCount the count of forwarded fields.
          + * @param aggregationStateType the row type info of aggregation
          + * @param inputRowType the row type info of input row
          + * @param precedingOffset the preceding offset
          + */
          +class RowsClauseBoundedOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo,
          + private val inputRowType: RowTypeInfo,
          + private val precedingOffset: Long)
          + extends ProcessFunction[Row, Row] {
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          + Preconditions.checkNotNull(forwardedFieldCount)
          + Preconditions.checkNotNull(aggregationStateType)
          + Preconditions.checkNotNull(precedingOffset)
          +
          + private var output: Row = _
          +
          + // the state which keeps the last triggering timestamp
          + private var lastTriggeringTsState: ValueState[Long] = _
          +
          + // the state which keeps the count of data
          + private var dataCountState: ValueState[Long] = null
          +
          + // the state which used to materialize the accumulator for incremental calculation
          + private var accumulatorState: ValueState[Row] = _
          +
          + // the state which keeps all the data that are not expired.
          + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp,
          + // the second element of tuple is a list that contains the entire data of all the rows belonging
          + // to this time stamp.
          + private var dataState: MapState[Long, JList[Row]] = _
          +
          + override def open(config: Configuration)

          { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + // triggering timestamp for trigger calculation
          + val triggeringTs = ctx.timestamp
          +
          + val lastTriggeringTs = lastTriggeringTsState.value
          + // check if the data is expired, if not, save the data and register event time timer
          +
          + if (triggeringTs > lastTriggeringTs) {
          + val data = dataState.get(triggeringTs)
          + if (null != data)

          { + data.add(input) + dataState.put(triggeringTs, data) + }

          else

          { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + }

          + }
          + }
          +
          + override def onTimer(
          + timestamp: Long,
          + ctx: ProcessFunction[Row, Row]#OnTimerContext,
          + out: Collector[Row]): Unit = {
          +
          + // gets all window data from state for the calculation
          + val inputs: JList[Row] = dataState.get(timestamp)
          +
          + if (null != inputs) {
          +
          + var accumulators = accumulatorState.value
          + var dataCount = dataCountState.value
          +
          + var retractList: JList[Row] = null
          + var retractTs: Long = Long.MaxValue
          + var j = 0
          + var i = 0
          +
          + while (j < inputs.size) {
          + val input = inputs.get(j)
          +
          + // initialize when first run or failover recovery per key
          + if (null == accumulators) {
          + accumulators = new Row(aggregates.length)
          + i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + }

          + }
          +
          + var retractRow: Row = null
          +
          + if (dataCount >= precedingOffset) {
          + if (null == retractList || retractList.isEmpty) {
          + retractTs = Long.MaxValue
          + val dataTimestampIt = dataState.keys.iterator
          + while (dataTimestampIt.hasNext) {
          + val dataTs = dataTimestampIt.next
          + if (dataTs < retractTs)

          { + retractTs = dataTs + }

          + }
          + retractList = dataState.get(retractTs)
          + }
          +
          + retractRow = retractList.get(0)
          + retractList.remove(0)
          — End diff –

          We should avoid `remove()`. Since `retractList` is an `ArrayList` it will do an `arraycopy` under the hood. We could rather count how many rows we have retracted.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3585#discussion_r107877027 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala — @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util. {ArrayList, List => JList} + +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeinfo. {BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils. {ListTypeInfo, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function for ROWS clause event-time bounded OVER window + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount the count of forwarded fields. + * @param aggregationStateType the row type info of aggregation + * @param inputRowType the row type info of input row + * @param precedingOffset the preceding offset + */ +class RowsClauseBoundedOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo, + private val inputRowType: RowTypeInfo, + private val precedingOffset: Long) + extends ProcessFunction [Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkNotNull(forwardedFieldCount) + Preconditions.checkNotNull(aggregationStateType) + Preconditions.checkNotNull(precedingOffset) + + private var output: Row = _ + + // the state which keeps the last triggering timestamp + private var lastTriggeringTsState: ValueState [Long] = _ + + // the state which keeps the count of data + private var dataCountState: ValueState [Long] = null + + // the state which used to materialize the accumulator for incremental calculation + private var accumulatorState: ValueState [Row] = _ + + // the state which keeps all the data that are not expired. + // The first element (as the mapState key) of the tuple is the time stamp. Per each time stamp, + // the second element of tuple is a list that contains the entire data of all the rows belonging + // to this time stamp. + private var dataState: MapState[Long, JList [Row] ] = _ + + override def open(config: Configuration) { + + output = new Row(forwardedFieldCount + aggregates.length) + + val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) + lastTriggeringTsState = getRuntimeContext.getState(lastTriggeringTsDescriptor) + + val dataCountStateDescriptor = + new ValueStateDescriptor[Long]("dataCountState", classOf[Long]) + dataCountState = getRuntimeContext.getState(dataCountStateDescriptor) + + val accumulatorStateDescriptor = + new ValueStateDescriptor[Row]("accumulatorState", aggregationStateType) + accumulatorState = getRuntimeContext.getState(accumulatorStateDescriptor) + + val keyTypeInformation: TypeInformation[Long] = + BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] + val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + + val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]( + "dataState", + keyTypeInformation, + valueTypeInformation) + + dataState = getRuntimeContext.getMapState(mapStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + // triggering timestamp for trigger calculation + val triggeringTs = ctx.timestamp + + val lastTriggeringTs = lastTriggeringTsState.value + // check if the data is expired, if not, save the data and register event time timer + + if (triggeringTs > lastTriggeringTs) { + val data = dataState.get(triggeringTs) + if (null != data) { + data.add(input) + dataState.put(triggeringTs, data) + } else { + val data = new ArrayList[Row] + data.add(input) + dataState.put(triggeringTs, data) + // register event time timer + ctx.timerService.registerEventTimeTimer(triggeringTs) + } + } + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction [Row, Row] #OnTimerContext, + out: Collector [Row] ): Unit = { + + // gets all window data from state for the calculation + val inputs: JList [Row] = dataState.get(timestamp) + + if (null != inputs) { + + var accumulators = accumulatorState.value + var dataCount = dataCountState.value + + var retractList: JList [Row] = null + var retractTs: Long = Long.MaxValue + var j = 0 + var i = 0 + + while (j < inputs.size) { + val input = inputs.get(j) + + // initialize when first run or failover recovery per key + if (null == accumulators) { + accumulators = new Row(aggregates.length) + i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator) + i += 1 + } + } + + var retractRow: Row = null + + if (dataCount >= precedingOffset) { + if (null == retractList || retractList.isEmpty) { + retractTs = Long.MaxValue + val dataTimestampIt = dataState.keys.iterator + while (dataTimestampIt.hasNext) { + val dataTs = dataTimestampIt.next + if (dataTs < retractTs) { + retractTs = dataTs + } + } + retractList = dataState.get(retractTs) + } + + retractRow = retractList.get(0) + retractList.remove(0) — End diff – We should avoid `remove()`. Since `retractList` is an `ArrayList` it will do an `arraycopy` under the hood. We could rather count how many rows we have retracted.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3585

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3585
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with 7a9d39fe9f659d43bf4719a2981f6c4771ffbe48

          Show
          fhueske Fabian Hueske added a comment - Implemented with 7a9d39fe9f659d43bf4719a2981f6c4771ffbe48

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development