Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Add Tumble group-windows for batch tables as described in FLIP-11.

        Issue Links

          Activity

          Hide
          jark Jark Wu added a comment -

          Hi Timo Walther Fabian Hueske, I have some problems with this issue. I know that the tumbling window can be computed by applying a MapFunction that extracts the grouping key(s) for each row and applying a regular groupBy aggregation on all keys. But it seems that the sliding window can't be computed in this way. Because every row should be grouped into multiple windows.

          One way come out in my mind is partitioning the table on the non-window grouping keys and applying a groupReduce with a group sort on the window-key. This is very like the way of session window for batch tables.

          Do you have good ideas ?

          Show
          jark Jark Wu added a comment - Hi Timo Walther Fabian Hueske , I have some problems with this issue. I know that the tumbling window can be computed by applying a MapFunction that extracts the grouping key(s) for each row and applying a regular groupBy aggregation on all keys. But it seems that the sliding window can't be computed in this way. Because every row should be grouped into multiple windows. One way come out in my mind is partitioning the table on the non-window grouping keys and applying a groupReduce with a group sort on the window-key. This is very like the way of session window for batch tables. Do you have good ideas ?
          Hide
          fhueske Fabian Hueske added a comment -

          Hi Jark Wu, I see two ways to implement sliding windows for batch:

          1. replicate the output in order to assign keys for overlapping windows. This is probably the more straight-forward implementation and supports any aggregation function but blows up the data volume.
          2. if the aggregation functions are combinable / pre-aggregatable, we can also find the largest tumbling window size from which the sliding windows can be assembled. This is basically the technique used to express sliding windows with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 minutes, 2 minutes) this would mean to first compute aggregates of non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of these into a sliding window (could be done in a MapPartition with sorted input). The implementation could be done as an optimizer rule to split the sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe it makes sense to implement the WINDOW clause first and reuse this for sliding windows.

          OK, given the complexity for sliding group-windows, I think it makes sense to split this issue into tumbling and sliding windows.
          What do you think Jark Wu?

          Show
          fhueske Fabian Hueske added a comment - Hi Jark Wu , I see two ways to implement sliding windows for batch: 1. replicate the output in order to assign keys for overlapping windows. This is probably the more straight-forward implementation and supports any aggregation function but blows up the data volume. 2. if the aggregation functions are combinable / pre-aggregatable, we can also find the largest tumbling window size from which the sliding windows can be assembled. This is basically the technique used to express sliding windows with plain SQL (GROUP BY + OVER clauses). For a sliding window Slide(10 minutes, 2 minutes) this would mean to first compute aggregates of non-overlapping (tumbling) 2 minute windows and assembling consecutively 5 of these into a sliding window (could be done in a MapPartition with sorted input). The implementation could be done as an optimizer rule to split the sliding aggregate into a tumbling aggregate and a SQL WINDOW operator. Maybe it makes sense to implement the WINDOW clause first and reuse this for sliding windows. OK, given the complexity for sliding group-windows, I think it makes sense to split this issue into tumbling and sliding windows. What do you think Jark Wu ?
          Hide
          jark Jark Wu added a comment -

          Yes. I agree to move the sliding window to a separate issue. And we can discuss the implementation more detail in that issue.

          Option 2 is a nicer way but only support combinable aggregation. Maybe we can implement approach-1 in the first version, and do improvement in the later issues.

          Show
          jark Jark Wu added a comment - Yes. I agree to move the sliding window to a separate issue. And we can discuss the implementation more detail in that issue. Option 2 is a nicer way but only support combinable aggregation. Maybe we can implement approach-1 in the first version, and do improvement in the later issues.
          Hide
          jark Jark Wu added a comment -

          Hi guys, I moved the sliding window into FLINK-5047. And keep this issue only for tumbling window. I suggest to continue the discussion of sliding window implementation under FLINK-5047.

          Show
          jark Jark Wu added a comment - Hi guys, I moved the sliding window into FLINK-5047 . And keep this issue only for tumbling window. I suggest to continue the discussion of sliding window implementation under FLINK-5047 .
          Hide
          fhueske Fabian Hueske added a comment -

          I just added a third, hybrid approach to FLINK-5047 which builds on Option 1 and the pre-aggregation of Option 2.

          Show
          fhueske Fabian Hueske added a comment - I just added a third, hybrid approach to FLINK-5047 which builds on Option 1 and the pre-aggregation of Option 2.
          Hide
          fhueske Fabian Hueske added a comment -

          Thanks Jark Wu!

          Show
          fhueske Fabian Hueske added a comment - Thanks Jark Wu !
          Hide
          jark Jark Wu added a comment - - edited

          Hi Fabian Hueske Timo Walther, I have proposed a design doc

          for this issue and made a prototype. Could you have a look at the design ? Any feedbacks are welcome!

          Show
          jark Jark Wu added a comment - - edited Hi Fabian Hueske Timo Walther , I have proposed a design doc for this issue and made a prototype. Could you have a look at the design ? Any feedbacks are welcome!
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user wuchong opened a pull request:

          https://github.com/apache/flink/pull/2938

          FLINK-4692 [tableApi] Add tumbling group-windows for batch tables

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          This PR supports tumbling group-window on batch.

          The design doc please refer to [this](https://docs.google.com/document/d/1lzpnNUmNzn9yuCGf1RSjHuHAWm-O_v2in7y90muXI2o/edit).

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/wuchong/flink window-batch-FLINK-4692

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2938.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2938


          commit 0f514376e021857bf5419a85f45747c9daad2faa
          Author: Jark Wu <wuchong.wc@alibaba-inc.com>
          Date: 2016-11-17T03:38:01Z

          FLINK-4692 [tableApi] Add tumbling group-windows for batch tables


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/2938 FLINK-4692 [tableApi] Add tumbling group-windows for batch tables Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed This PR supports tumbling group-window on batch. The design doc please refer to [this] ( https://docs.google.com/document/d/1lzpnNUmNzn9yuCGf1RSjHuHAWm-O_v2in7y90muXI2o/edit ). You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink window-batch- FLINK-4692 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2938.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2938 commit 0f514376e021857bf5419a85f45747c9daad2faa Author: Jark Wu <wuchong.wc@alibaba-inc.com> Date: 2016-11-17T03:38:01Z FLINK-4692 [tableApi] Add tumbling group-windows for batch tables
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2938

          I would like to shepherd this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 I would like to shepherd this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2938

          Thanks for working on this @wuchong! The code looks very good. I haven"t reviewed the code entirely but tried to verify if batch and stream queries return the same result. Unfortunately, this is not the case at the moment.

          I converted the stream test `org.apache.flink.api.scala.stream.table.AggregationsITCase#testEventTimeTumblingWindow` and your batch test `org.apache.flink.api.scala.batch.table.AggregationsITCase#testEventTimeTumblingGroupWindowOverTime` but the aggregations are different.

          Could you have a look at it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 Thanks for working on this @wuchong! The code looks very good. I haven"t reviewed the code entirely but tried to verify if batch and stream queries return the same result. Unfortunately, this is not the case at the moment. I converted the stream test `org.apache.flink.api.scala.stream.table.AggregationsITCase#testEventTimeTumblingWindow` and your batch test `org.apache.flink.api.scala.batch.table.AggregationsITCase#testEventTimeTumblingGroupWindowOverTime` but the aggregations are different. Could you have a look at it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/2938

          Sure, I will look at it today @twalthr .

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2938 Sure, I will look at it today @twalthr .
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/2938

          Hi @twalthr , I have tested and the batch and stream queries return the same result in my machine. The `org.apache.flink.api.scala.stream.table.AggregationsITCase#testEventTimeTumblingWindow` and `org.apache.flink.api.scala.batch.table.AggregationsITCase#testEventTimeTumblingGroupWindowOverTime` have different result because of the different input data. If change batch input data to the same stream input data, they will return the same result. Is that the issue you confusing ?

          BTW, I have rebased the code and fixed the conflicts.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2938 Hi @twalthr , I have tested and the batch and stream queries return the same result in my machine. The `org.apache.flink.api.scala.stream.table.AggregationsITCase#testEventTimeTumblingWindow` and `org.apache.flink.api.scala.batch.table.AggregationsITCase#testEventTimeTumblingGroupWindowOverTime` have different result because of the different input data. If change batch input data to the same stream input data, they will return the same result. Is that the issue you confusing ? BTW, I have rebased the code and fixed the conflicts.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2938

          I will look into it again. Actually I used the same data, but maybe I did a mistake.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 I will look into it again. Actually I used the same data, but maybe I did a mistake.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2938

          Here is an example to reproduce the wrong results:

          ```scala
          @Test
          def testEventTimeTumblingWindowStream(): Unit =

          { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table .groupBy('string) .window(Tumble over 5.milli on 'rowtime as 'w) .select('string, 'int.count, 'int.avg, 'w.start, 'w.end) val results = windowedTable.toDataStream[Row].print() env.execute() }

          @Test
          def testEventTimeTumblingWindowBatch(): Unit =

          { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env .fromCollection(data) val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table .groupBy('string) .window(Tumble over 5.milli on 'long as 'w) .select('string, 'int.count, 'int.avg, 'w.start, 'w.end) val results = windowedTable.toDataSet[Row].print() }

          ```

          It should be:
          ```
          Hello world,1,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01
          Hello,2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005
          Hello world,1,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02
          Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005
          ```

          But is:
          ```
          Hello,2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005
          Hello world,2,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005
          Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 Here is an example to reproduce the wrong results: ```scala @Test def testEventTimeTumblingWindowStream(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table .groupBy('string) .window(Tumble over 5.milli on 'rowtime as 'w) .select('string, 'int.count, 'int.avg, 'w.start, 'w.end) val results = windowedTable.toDataStream[Row].print() env.execute() } @Test def testEventTimeTumblingWindowBatch(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env .fromCollection(data) val table = stream.toTable(tEnv, 'long, 'int, 'string) val windowedTable = table .groupBy('string) .window(Tumble over 5.milli on 'long as 'w) .select('string, 'int.count, 'int.avg, 'w.start, 'w.end) val results = windowedTable.toDataSet[Row].print() } ``` It should be: ``` Hello world,1,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01 Hello,2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005 Hello world,1,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02 Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005 ``` But is: ``` Hello,2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005 Hello world,2,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005 Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005 ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/2938

          It's wired. I can't reproduce the wrong results in my local environment. What is the `data` in your test @twalthr ? I'm using the following data in the batch and stream test, but the result is same.

          ```scala
          val data = List(
          (1L, 1, "Hi"),
          (2L, 2, "Hello"),
          (4L, 2, "Hello"),
          (8L, 3, "Hello world"),
          (6L, 3, "Hello world"))
          ```

          cc @fhueske could your help to test this in your environment ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2938 It's wired. I can't reproduce the wrong results in my local environment. What is the `data` in your test @twalthr ? I'm using the following data in the batch and stream test, but the result is same. ```scala val data = List( (1L, 1, "Hi"), (2L, 2, "Hello"), (4L, 2, "Hello"), (8L, 3, "Hello world"), (6L, 3, "Hello world")) ``` cc @fhueske could your help to test this in your environment ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2938

          @wuchong I used the data of `org.apache.flink.table.api.scala.stream.table.AggregationsITCase`.

          ```
          val data = List(
          (1L, 1, "Hi"),
          (2L, 2, "Hello"),
          (4L, 2, "Hello"),
          (8L, 3, "Hello world"),
          (16L, 3, "Hello world"))
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 @wuchong I used the data of `org.apache.flink.table.api.scala.stream.table.AggregationsITCase`. ``` val data = List( (1L, 1, "Hi"), (2L, 2, "Hello"), (4L, 2, "Hello"), (8L, 3, "Hello world"), (16L, 3, "Hello world")) ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2938

          Hi, I ran both methods with the provided data and both compute the same result on my machine.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2938 Hi, I ran both methods with the provided data and both compute the same result on my machine.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2938

          @fhueske and I looked into this again. I seems that result depends on the batch ExecutionEnvironment. I used a regular environment while the test base uses a CollectionExecutionEnvironment. We don't know if this is a problem of your implementation or a bug in the collection environment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 @fhueske and I looked into this again. I seems that result depends on the batch ExecutionEnvironment. I used a regular environment while the test base uses a CollectionExecutionEnvironment. We don't know if this is a problem of your implementation or a bug in the collection environment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/2938

          Thanks @twalthr for your hint. It is a bug in my `DataSetTumbleTimeWindowAggReduceCombineFunction#combine(...)` method, that the rowtime attribute is dropped when combining.

          The collection environment will not run combine phase, but cluster environment will. That's why we can't reproduce the wrong result in test base.

          BTW, do we need to activate the cluster execution mode in table IT cases ? Currently, only collection execution mode is activated.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2938 Thanks @twalthr for your hint. It is a bug in my `DataSetTumbleTimeWindowAggReduceCombineFunction#combine(...)` method, that the rowtime attribute is dropped when combining. The collection environment will not run combine phase, but cluster environment will. That's why we can't reproduce the wrong result in test base. BTW, do we need to activate the cluster execution mode in table IT cases ? Currently, only collection execution mode is activated.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2938

          Thanks for fixing the bug. The collection execution mode is on purpose because we have too many IT cases at the moment. We should only test the subclasses of `FlinkRel` like `DataSetUnion`, `DataStreamAggregate` etc. with IT cases. Once this is the case we can reenable the cluster mode.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 Thanks for fixing the bug. The collection execution mode is on purpose because we have too many IT cases at the moment. We should only test the subclasses of `FlinkRel` like `DataSetUnion`, `DataStreamAggregate` etc. with IT cases. Once this is the case we can reenable the cluster mode.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2938

          It would also be good to have a set of well selected end-to-end IT cases. But Timo is right, the previous approach of starting a mini cluster for each test case was too expensive to maintain. The collection execution is not perfect (combiners, parallelism, semantic annotations, ...), but sufficient for most cases.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2938 It would also be good to have a set of well selected end-to-end IT cases. But Timo is right, the previous approach of starting a mini cluster for each test case was too expensive to maintain. The collection execution is not perfect (combiners, parallelism, semantic annotations, ...), but sufficient for most cases.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r95554814

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupWindowTest.scala —
          @@ -0,0 +1,336 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.api.scala.batch.table
          — End diff –

          This class can be moved to `org.apache.flink.table.api.scala.batch.table`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r95554814 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupWindowTest.scala — @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.batch.table — End diff – This class can be moved to `org.apache.flink.table.api.scala.batch.table`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r95549888

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -0,0 +1,244 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.plan.nodes.dataset
          +
          +import org.apache.calcite.plan.

          {RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}

          +import org.apache.calcite.rel.`type`.RelDataType
          +import org.apache.calcite.rel.core.AggregateCall
          +import org.apache.calcite.rel.metadata.RelMetadataQuery
          +import org.apache.calcite.rel.

          {RelNode, RelWriter, SingleRel}

          +import org.apache.flink.api.common.operators.Order
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.DataSet
          +import org.apache.flink.api.java.typeutils.

          {ResultTypeQueryable, RowTypeInfo}

          +import org.apache.flink.table.api.BatchTableEnvironment
          +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.table.calcite.FlinkTypeFactory
          +import org.apache.flink.table.plan.logical._
          +import org.apache.flink.table.plan.nodes.FlinkAggregate
          +import org.apache.flink.table.runtime.aggregate.AggregateUtil.

          {CalcitePair, _}

          +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
          +import org.apache.flink.table.typeutils.TypeConverter
          +import org.apache.flink.types.Row
          +
          +import scala.collection.JavaConversions._
          +
          +/**
          + * Flink RelNode which matches along with a LogicalWindowAggregate.
          + */
          +class DataSetWindowAggregate(
          + window: LogicalWindow,
          + namedProperties: Seq[NamedWindowProperty],
          + cluster: RelOptCluster,
          + traitSet: RelTraitSet,
          + inputNode: RelNode,
          + namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          + rowRelDataType: RelDataType,
          + inputType: RelDataType,
          + grouping: Array[Int])
          + extends SingleRel(cluster, traitSet, inputNode)
          + with FlinkAggregate
          + with DataSetRel {
          +
          + override def deriveRowType() = rowRelDataType
          +
          + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode =

          { + new DataSetWindowAggregate( + window, + namedProperties, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + }

          +
          + override def toString: String = {
          + s"Aggregate(${
          + if (!grouping.isEmpty) {
          + s"groupBy: ($

          {groupingToString(inputType, grouping)}

          ), "
          + } else

          { + "" + }

          + }window: ($window), " +
          + s"select: ($

          { + aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties) + }

          ))"
          + }
          +
          + override def explainTerms(pw: RelWriter): RelWriter =

          { + super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item( + "select", aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties)) + }

          +
          + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

          { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.namedAggregates.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + }

          +
          + override def translateToPlan(
          + tableEnv: BatchTableEnvironment,
          + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
          +
          + val config = tableEnv.getConfig
          +
          + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
          + tableEnv,
          + // tell the input operator that this operator currently only supports Rows as input
          + Some(TypeConverter.DEFAULT_ROW_TYPE))
          +
          + val result = window match

          { + case EventTimeTumblingGroupWindow(_, _, size) => + createEventTimeTumblingWindowDataSet(inputDS, isTimeInterval(size.resultType)) + case EventTimeSessionGroupWindow(_, _, _) => + throw new UnsupportedOperationException( + "Event-time session windows on batch are currently not supported") + case EventTimeSlidingGroupWindow(_, _, _, _) => + throw new UnsupportedOperationException( + "Event-time sliding windows on batch are currently not supported") + case _: ProcessingTimeGroupWindow => + throw new UnsupportedOperationException( + "Processing-time tumbling windows are not supported on batch tables, " + + "window on batch must declare a time attribute over which the query is evaluated.") + }

          +
          + // if the expected type is not a Row, inject a mapper to convert to the expected type
          + expectedType match {
          + case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
          + val mapName = s"convert: ($

          {getRowType.getFieldNames.toList.mkString(", ")}

          )"
          + result.map(
          + getConversionMapper(
          + config = config,
          + nullableInput = false,
          + inputType = resultRowTypeInfo.asInstanceOf[TypeInformation[Any]],
          + expectedType = expectedType.get,
          + conversionOperatorName = "DataSetWindowAggregateConversion",
          + fieldNames = getRowType.getFieldNames
          + ))
          + .name(mapName)
          + case _ => result
          + }
          + }
          +
          +
          + private def createEventTimeTumblingWindowDataSet(
          + inputDS: DataSet[Any],
          + isTimeWindow: Boolean)
          + : DataSet[Any] = {
          + val mapFunction = createDataSetWindowPrepareMapFunction(
          + window,
          + namedAggregates,
          + grouping,
          + inputType)
          + val groupReduceFunction = createDataSetWindowAggGroupReduceFunction(
          + window,
          + namedAggregates,
          + inputType,
          + getRowType,
          + grouping,
          + namedProperties)
          +
          + val mappedInput = inputDS
          + .map(mapFunction)
          + .name(prepareOperatorName)
          +
          + if (isTimeWindow)

          { + // grouped time window aggregation + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType + // group by grouping keys and rowtime field (the last field in the row) + val groupingKeys = grouping.indices ++ Seq(mapReturnType.getArity - 1) + mappedInput.asInstanceOf[DataSet[Row]] + .groupBy(groupingKeys: _*) + .reduceGroup(groupReduceFunction) + .returns(resultRowTypeInfo) + .name(aggregateOperatorName) + .asInstanceOf[DataSet[Any]] + }

          else {
          + // count window
          + val groupingKeys = grouping.indices.toArray
          + if (groupingKeys.length > 0) {
          + // grouped aggregation
          + mappedInput.asInstanceOf[DataSet[Row]]
          + .groupBy(groupingKeys: _*)
          + // sort on time field, it's the one after grouping keys
          + .sortGroup(groupingKeys.length, Order.ASCENDING)
          — End diff –

          Shouldn't this be `mapReturnType.getArity - 1`? According to the docs of `AggregateUtil#createDataSetWindowPrepareMapFunction` the time field should be at the end?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r95549888 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.dataset + +import org.apache.calcite.plan. {RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel. {RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils. {ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.FlinkAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil. {CalcitePair, _} +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.TypeConverter +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode which matches along with a LogicalWindowAggregate. + */ +class DataSetWindowAggregate( + window: LogicalWindow, + namedProperties: Seq [NamedWindowProperty] , + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputNode: RelNode, + namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + rowRelDataType: RelDataType, + inputType: RelDataType, + grouping: Array [Int] ) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List [RelNode] ): RelNode = { + new DataSetWindowAggregate( + window, + namedProperties, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { + s"Aggregate(${ + if (!grouping.isEmpty) { + s"groupBy: ($ {groupingToString(inputType, grouping)} ), " + } else { + "" + } + }window: ($window), " + + s"select: ($ { + aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties) + } ))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item( + "select", aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties)) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.namedAggregates.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation [Any] ]): DataSet [Any] = { + + val config = tableEnv.getConfig + + val inputDS = getInput.asInstanceOf [DataSetRel] .translateToPlan( + tableEnv, + // tell the input operator that this operator currently only supports Rows as input + Some(TypeConverter.DEFAULT_ROW_TYPE)) + + val result = window match { + case EventTimeTumblingGroupWindow(_, _, size) => + createEventTimeTumblingWindowDataSet(inputDS, isTimeInterval(size.resultType)) + case EventTimeSessionGroupWindow(_, _, _) => + throw new UnsupportedOperationException( + "Event-time session windows on batch are currently not supported") + case EventTimeSlidingGroupWindow(_, _, _, _) => + throw new UnsupportedOperationException( + "Event-time sliding windows on batch are currently not supported") + case _: ProcessingTimeGroupWindow => + throw new UnsupportedOperationException( + "Processing-time tumbling windows are not supported on batch tables, " + + "window on batch must declare a time attribute over which the query is evaluated.") + } + + // if the expected type is not a Row, inject a mapper to convert to the expected type + expectedType match { + case Some(typeInfo) if typeInfo.getTypeClass != classOf [Row] => + val mapName = s"convert: ($ {getRowType.getFieldNames.toList.mkString(", ")} )" + result.map( + getConversionMapper( + config = config, + nullableInput = false, + inputType = resultRowTypeInfo.asInstanceOf[TypeInformation [Any] ], + expectedType = expectedType.get, + conversionOperatorName = "DataSetWindowAggregateConversion", + fieldNames = getRowType.getFieldNames + )) + .name(mapName) + case _ => result + } + } + + + private def createEventTimeTumblingWindowDataSet( + inputDS: DataSet [Any] , + isTimeWindow: Boolean) + : DataSet [Any] = { + val mapFunction = createDataSetWindowPrepareMapFunction( + window, + namedAggregates, + grouping, + inputType) + val groupReduceFunction = createDataSetWindowAggGroupReduceFunction( + window, + namedAggregates, + inputType, + getRowType, + grouping, + namedProperties) + + val mappedInput = inputDS + .map(mapFunction) + .name(prepareOperatorName) + + if (isTimeWindow) { + // grouped time window aggregation + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType + // group by grouping keys and rowtime field (the last field in the row) + val groupingKeys = grouping.indices ++ Seq(mapReturnType.getArity - 1) + mappedInput.asInstanceOf[DataSet[Row]] + .groupBy(groupingKeys: _*) + .reduceGroup(groupReduceFunction) + .returns(resultRowTypeInfo) + .name(aggregateOperatorName) + .asInstanceOf[DataSet[Any]] + } else { + // count window + val groupingKeys = grouping.indices.toArray + if (groupingKeys.length > 0) { + // grouped aggregation + mappedInput.asInstanceOf[DataSet [Row] ] + .groupBy(groupingKeys: _*) + // sort on time field, it's the one after grouping keys + .sortGroup(groupingKeys.length, Order.ASCENDING) — End diff – Shouldn't this be `mapReturnType.getArity - 1`? According to the docs of `AggregateUtil#createDataSetWindowPrepareMapFunction` the time field should be at the end?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r95552892

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -591,5 +729,27 @@ object AggregateUtil

          { groupingOffsetMapping.toArray }

          +
          + private def getTimeFieldPosition(timeField: Expression, inputType: RelDataType): Int = {
          + timeField match {
          + case ResolvedFieldReference(name, resultType) =>
          + // get the RelDataType referenced by the time-field
          + val relDataType = inputType.getFieldList.filter(r => name.equals(r.getName))
          — End diff –

          Do we want to be case-insensitive here?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r95552892 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -591,5 +729,27 @@ object AggregateUtil { groupingOffsetMapping.toArray } + + private def getTimeFieldPosition(timeField: Expression, inputType: RelDataType): Int = { + timeField match { + case ResolvedFieldReference(name, resultType) => + // get the RelDataType referenced by the time-field + val relDataType = inputType.getFieldList.filter(r => name.equals(r.getName)) — End diff – Do we want to be case-insensitive here?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r95552574

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -591,5 +729,27 @@ object AggregateUtil

          { groupingOffsetMapping.toArray }

          +
          + private def getTimeFieldPosition(timeField: Expression, inputType: RelDataType): Int = {
          + timeField match {
          + case ResolvedFieldReference(name, resultType) =>
          + // get the RelDataType referenced by the time-field
          + val relDataType = inputType.getFieldList.filter(r => name.equals(r.getName))
          + // should only match one
          + if (relDataType.length == 1)

          { + relDataType.head.getIndex + }

          else {
          + throw new IllegalArgumentException()
          — End diff –

          I would throw a general `TableException` and give a meaningful message here and in the following 2 exceptions.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r95552574 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -591,5 +729,27 @@ object AggregateUtil { groupingOffsetMapping.toArray } + + private def getTimeFieldPosition(timeField: Expression, inputType: RelDataType): Int = { + timeField match { + case ResolvedFieldReference(name, resultType) => + // get the RelDataType referenced by the time-field + val relDataType = inputType.getFieldList.filter(r => name.equals(r.getName)) + // should only match one + if (relDataType.length == 1) { + relDataType.head.getIndex + } else { + throw new IllegalArgumentException() — End diff – I would throw a general `TableException` and give a meaningful message here and in the following 2 exceptions.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r94944955

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -0,0 +1,242 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.api.table.plan.nodes.dataset
          +
          +import org.apache.calcite.plan.

          {RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}

          +import org.apache.calcite.rel.

          {RelNode, RelWriter, SingleRel}

          +import org.apache.calcite.rel.`type`.RelDataType
          +import org.apache.calcite.rel.core.AggregateCall
          +import org.apache.calcite.rel.metadata.RelMetadataQuery
          +import org.apache.flink.api.common.operators.Order
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.DataSet
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
          +import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.api.table.

          {BatchTableEnvironment, FlinkTypeFactory, Row}

          +import org.apache.flink.api.table.plan.logical._
          +import org.apache.flink.api.table.plan.nodes.FlinkAggregate
          +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
          +import org.apache.flink.api.table.typeutils.TypeCheckUtils._
          +import org.apache.flink.api.table.typeutils._
          +
          +import scala.collection.JavaConversions._
          +
          +/**
          + * Flink RelNode which matches along with a LogicalWindowAggregate.
          + */
          +class DataSetWindowAggregate(
          + window: LogicalWindow,
          + namedProperties: Seq[NamedWindowProperty],
          + cluster: RelOptCluster,
          + traitSet: RelTraitSet,
          + inputNode: RelNode,
          + namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          + rowRelDataType: RelDataType,
          + inputType: RelDataType,
          + grouping: Array[Int])
          + extends SingleRel(cluster, traitSet, inputNode)
          + with FlinkAggregate
          + with DataSetRel {
          +
          + override def deriveRowType() = rowRelDataType
          +
          + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode =

          { + new DataSetWindowAggregate( + window, + namedProperties, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + }

          +
          + override def toString: String = {
          + s"Aggregate(${
          + if (!grouping.isEmpty) {
          + s"groupBy: ($

          {groupingToString(inputType, grouping)}

          ), "
          + } else

          { + "" + }

          + }window: ($window), " +
          + s"select: ($

          { + aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties) + }

          ))"
          + }
          +
          + override def explainTerms(pw: RelWriter): RelWriter =

          { + super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item( + "select", aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties)) + }

          +
          + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

          { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.namedAggregates.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + }

          +
          + override def translateToPlan(
          + tableEnv: BatchTableEnvironment,
          + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
          +
          + val config = tableEnv.getConfig
          +
          + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
          + tableEnv,
          + // tell the input operator that this operator currently only supports Rows as input
          + Some(TypeConverter.DEFAULT_ROW_TYPE))
          +
          + val result = window match {
          + case EventTimeTumblingGroupWindow(_, _, size) =>
          + createEventTimeTumblingWindowDataSet(inputDS, isTimeInterval(size.resultType))
          + case EventTimeSessionGroupWindow(_, _, _) =>
          + throw new UnsupportedOperationException(
          + "Event-time session windows on batch are currently not supported")
          — End diff –

          I would use "in a batch environment" instead of "on batch" in general.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r94944955 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.plan.nodes.dataset + +import org.apache.calcite.plan. {RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel. {RelNode, RelWriter, SingleRel} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.api.table. {BatchTableEnvironment, FlinkTypeFactory, Row} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.typeutils.TypeCheckUtils._ +import org.apache.flink.api.table.typeutils._ + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode which matches along with a LogicalWindowAggregate. + */ +class DataSetWindowAggregate( + window: LogicalWindow, + namedProperties: Seq [NamedWindowProperty] , + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputNode: RelNode, + namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + rowRelDataType: RelDataType, + inputType: RelDataType, + grouping: Array [Int] ) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List [RelNode] ): RelNode = { + new DataSetWindowAggregate( + window, + namedProperties, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { + s"Aggregate(${ + if (!grouping.isEmpty) { + s"groupBy: ($ {groupingToString(inputType, grouping)} ), " + } else { + "" + } + }window: ($window), " + + s"select: ($ { + aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties) + } ))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item( + "select", aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties)) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.namedAggregates.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation [Any] ]): DataSet [Any] = { + + val config = tableEnv.getConfig + + val inputDS = getInput.asInstanceOf [DataSetRel] .translateToPlan( + tableEnv, + // tell the input operator that this operator currently only supports Rows as input + Some(TypeConverter.DEFAULT_ROW_TYPE)) + + val result = window match { + case EventTimeTumblingGroupWindow(_, _, size) => + createEventTimeTumblingWindowDataSet(inputDS, isTimeInterval(size.resultType)) + case EventTimeSessionGroupWindow(_, _, _) => + throw new UnsupportedOperationException( + "Event-time session windows on batch are currently not supported") — End diff – I would use "in a batch environment" instead of "on batch" in general.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r95555277

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala —
          @@ -342,4 +342,85 @@ class AggregationsITCase(
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }

          + @Test(expected = classOf[UnsupportedOperationException])
          — End diff –

          I created a new package for runtime IT cases. You could move the new tests here in `org.apache.flink.table.runtime.dataset.DataSetWindowAggregateITCase`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r95555277 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala — @@ -342,4 +342,85 @@ class AggregationsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Test(expected = classOf [UnsupportedOperationException] ) — End diff – I created a new package for runtime IT cases. You could move the new tests here in `org.apache.flink.table.runtime.dataset.DataSetWindowAggregateITCase`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r95554086

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala —
          @@ -0,0 +1,93 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
          + * It is only used for tumbling count-window on batch.
          + *
          + * @param windowSize Tumble count window size
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count
          + * @param finalRowArity The output row field count
          + */
          +class DataSetTumbleCountWindowAggReduceGroupFunction(
          + private val windowSize: Long,
          + private val aggregates: Array[Aggregate[_ <: Any]],
          + private val groupKeysMapping: Array[(Int, Int)],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val intermediateRowArity: Int,
          + private val finalRowArity: Int)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + output = new Row(finalRowArity) + }

          +
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var count: Long = 0
          +
          + records.foreach( (record) => {
          — End diff –

          Just a small comment here for future implementations. We should try to reduce Scala magic in runtime classes. I would use a good old while loop here. We don't know how many method calls and helper objects are created in this reducer.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r95554086 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala — @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ + + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. + * It is only used for tumbling count-window on batch. + * + * @param windowSize Tumble count window size + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count + * @param finalRowArity The output row field count + */ +class DataSetTumbleCountWindowAggReduceGroupFunction( + private val windowSize: Long, + private val aggregates: Array[Aggregate [_ <: Any] ], + private val groupKeysMapping: Array [(Int, Int)] , + private val aggregateMapping: Array [(Int, Int)] , + private val intermediateRowArity: Int, + private val finalRowArity: Int) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + output = new Row(finalRowArity) + } + + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var count: Long = 0 + + records.foreach( (record) => { — End diff – Just a small comment here for future implementations. We should try to reduce Scala magic in runtime classes. I would use a good old while loop here. We don't know how many method calls and helper objects are created in this reducer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r95552283

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -517,22 +657,20 @@ object AggregateUtil {
          private def createAggregateBufferDataType(
          groupings: Array[Int],
          aggregates: Array[Aggregate[_]],

          • inputType: RelDataType): RowTypeInfo = {
            + inputType: RelDataType,
            + windowKeyType: Option[TypeInformation[_]] = None): RowTypeInfo = {

          // get the field data types of group keys.
          val groupingTypes: Seq[TypeInformation[_]] = groupings
          .map(inputType.getFieldList.get(_).getType)
          .map(FlinkTypeFactory.toTypeInfo)

          • val aggPartialNameSuffix = "agg_buffer_"
          • val factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT)
            -
            // get all field data types of all intermediate aggregates
            val aggTypes: Seq[TypeInformation[_]] = aggregates.flatMap(_.intermediateDataType)

          // concat group key types and aggregation types
          — End diff –

          Update comment.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r95552283 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -517,22 +657,20 @@ object AggregateUtil { private def createAggregateBufferDataType( groupings: Array [Int] , aggregates: Array[Aggregate [_] ], inputType: RelDataType): RowTypeInfo = { + inputType: RelDataType, + windowKeyType: Option[TypeInformation [_] ] = None): RowTypeInfo = { // get the field data types of group keys. val groupingTypes: Seq[TypeInformation [_] ] = groupings .map(inputType.getFieldList.get(_).getType) .map(FlinkTypeFactory.toTypeInfo) val aggPartialNameSuffix = "agg_buffer_" val factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT) - // get all field data types of all intermediate aggregates val aggTypes: Seq[TypeInformation [_] ] = aggregates.flatMap(_.intermediateDataType) // concat group key types and aggregation types — End diff – Update comment.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r94948374

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/window/batch/DataSetTumbleCountWindowAggReduceGroupFunction.scala —
          @@ -0,0 +1,94 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.api.table.runtime.aggregate.window.batch
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.api.table.Row
          +import org.apache.flink.api.table.runtime.aggregate.Aggregate
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
          + * It is only used for tumbling count-window on batch.
          + *
          + * @param windowSize Tumble count window size
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count
          + * @param finalRowArity The output row field count
          + */
          +class DataSetTumbleCountWindowAggReduceGroupFunction(
          + private val windowSize: Long,
          + private val aggregates: Array[Aggregate[_ <: Any]],
          + private val groupKeysMapping: Array[(Int, Int)],
          + private val aggregateMapping: Array[(Int, Int)],
          + private val intermediateRowArity: Int,
          + private val finalRowArity: Int)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + output = new Row(finalRowArity) + }

          +
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var count: Long = 0
          +
          + records.foreach( (record) => {
          — End diff –

          I would use good old while loops here instead of Scala magic as this is runtime code. JavaConversions add unnecessary overhead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r94948374 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/window/batch/DataSetTumbleCountWindowAggReduceGroupFunction.scala — @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime.aggregate.window.batch + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.runtime.aggregate.Aggregate +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ + + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. + * It is only used for tumbling count-window on batch. + * + * @param windowSize Tumble count window size + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count + * @param finalRowArity The output row field count + */ +class DataSetTumbleCountWindowAggReduceGroupFunction( + private val windowSize: Long, + private val aggregates: Array[Aggregate [_ <: Any] ], + private val groupKeysMapping: Array [(Int, Int)] , + private val aggregateMapping: Array [(Int, Int)] , + private val intermediateRowArity: Int, + private val finalRowArity: Int) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + output = new Row(finalRowArity) + } + + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var count: Long = 0 + + records.foreach( (record) => { — End diff – I would use good old while loops here instead of Scala magic as this is runtime code. JavaConversions add unnecessary overhead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r94943939

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala —
          @@ -107,7 +121,8 @@ case class EventTimeTumblingGroupWindow(
          super.validate(tableEnv)
          .orElse(TumblingGroupWindow.validate(tableEnv, size))
          .orElse(size match {

          • case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
            + case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS)
            + if tableEnv.isInstanceOf[StreamTableEnvironment] =>
            ValidationFailure(
            "Event-time grouping windows on row intervals are currently not supported.")
              • End diff –

          I would add "in a stream environment" to this and the other change.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r94943939 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala — @@ -107,7 +121,8 @@ case class EventTimeTumblingGroupWindow( super.validate(tableEnv) .orElse(TumblingGroupWindow.validate(tableEnv, size)) .orElse(size match { case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) => + case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) + if tableEnv.isInstanceOf [StreamTableEnvironment] => ValidationFailure( "Event-time grouping windows on row intervals are currently not supported.") End diff – I would add "in a stream environment" to this and the other change.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r95726648

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -0,0 +1,244 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.plan.nodes.dataset
          +
          +import org.apache.calcite.plan.

          {RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}

          +import org.apache.calcite.rel.`type`.RelDataType
          +import org.apache.calcite.rel.core.AggregateCall
          +import org.apache.calcite.rel.metadata.RelMetadataQuery
          +import org.apache.calcite.rel.

          {RelNode, RelWriter, SingleRel}

          +import org.apache.flink.api.common.operators.Order
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.DataSet
          +import org.apache.flink.api.java.typeutils.

          {ResultTypeQueryable, RowTypeInfo}

          +import org.apache.flink.table.api.BatchTableEnvironment
          +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
          +import org.apache.flink.table.calcite.FlinkTypeFactory
          +import org.apache.flink.table.plan.logical._
          +import org.apache.flink.table.plan.nodes.FlinkAggregate
          +import org.apache.flink.table.runtime.aggregate.AggregateUtil.

          {CalcitePair, _}

          +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval
          +import org.apache.flink.table.typeutils.TypeConverter
          +import org.apache.flink.types.Row
          +
          +import scala.collection.JavaConversions._
          +
          +/**
          + * Flink RelNode which matches along with a LogicalWindowAggregate.
          + */
          +class DataSetWindowAggregate(
          + window: LogicalWindow,
          + namedProperties: Seq[NamedWindowProperty],
          + cluster: RelOptCluster,
          + traitSet: RelTraitSet,
          + inputNode: RelNode,
          + namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          + rowRelDataType: RelDataType,
          + inputType: RelDataType,
          + grouping: Array[Int])
          + extends SingleRel(cluster, traitSet, inputNode)
          + with FlinkAggregate
          + with DataSetRel {
          +
          + override def deriveRowType() = rowRelDataType
          +
          + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode =

          { + new DataSetWindowAggregate( + window, + namedProperties, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + }

          +
          + override def toString: String = {
          + s"Aggregate(${
          + if (!grouping.isEmpty) {
          + s"groupBy: ($

          {groupingToString(inputType, grouping)}

          ), "
          + } else

          { + "" + }

          + }window: ($window), " +
          + s"select: ($

          { + aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties) + }

          ))"
          + }
          +
          + override def explainTerms(pw: RelWriter): RelWriter =

          { + super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item( + "select", aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties)) + }

          +
          + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost =

          { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.namedAggregates.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + }

          +
          + override def translateToPlan(
          + tableEnv: BatchTableEnvironment,
          + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
          +
          + val config = tableEnv.getConfig
          +
          + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
          + tableEnv,
          + // tell the input operator that this operator currently only supports Rows as input
          + Some(TypeConverter.DEFAULT_ROW_TYPE))
          +
          + val result = window match

          { + case EventTimeTumblingGroupWindow(_, _, size) => + createEventTimeTumblingWindowDataSet(inputDS, isTimeInterval(size.resultType)) + case EventTimeSessionGroupWindow(_, _, _) => + throw new UnsupportedOperationException( + "Event-time session windows on batch are currently not supported") + case EventTimeSlidingGroupWindow(_, _, _, _) => + throw new UnsupportedOperationException( + "Event-time sliding windows on batch are currently not supported") + case _: ProcessingTimeGroupWindow => + throw new UnsupportedOperationException( + "Processing-time tumbling windows are not supported on batch tables, " + + "window on batch must declare a time attribute over which the query is evaluated.") + }

          +
          + // if the expected type is not a Row, inject a mapper to convert to the expected type
          + expectedType match {
          + case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
          + val mapName = s"convert: ($

          {getRowType.getFieldNames.toList.mkString(", ")}

          )"
          + result.map(
          + getConversionMapper(
          + config = config,
          + nullableInput = false,
          + inputType = resultRowTypeInfo.asInstanceOf[TypeInformation[Any]],
          + expectedType = expectedType.get,
          + conversionOperatorName = "DataSetWindowAggregateConversion",
          + fieldNames = getRowType.getFieldNames
          + ))
          + .name(mapName)
          + case _ => result
          + }
          + }
          +
          +
          + private def createEventTimeTumblingWindowDataSet(
          + inputDS: DataSet[Any],
          + isTimeWindow: Boolean)
          + : DataSet[Any] = {
          + val mapFunction = createDataSetWindowPrepareMapFunction(
          + window,
          + namedAggregates,
          + grouping,
          + inputType)
          + val groupReduceFunction = createDataSetWindowAggGroupReduceFunction(
          + window,
          + namedAggregates,
          + inputType,
          + getRowType,
          + grouping,
          + namedProperties)
          +
          + val mappedInput = inputDS
          + .map(mapFunction)
          + .name(prepareOperatorName)
          +
          + if (isTimeWindow)

          { + // grouped time window aggregation + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType + // group by grouping keys and rowtime field (the last field in the row) + val groupingKeys = grouping.indices ++ Seq(mapReturnType.getArity - 1) + mappedInput.asInstanceOf[DataSet[Row]] + .groupBy(groupingKeys: _*) + .reduceGroup(groupReduceFunction) + .returns(resultRowTypeInfo) + .name(aggregateOperatorName) + .asInstanceOf[DataSet[Any]] + }

          else {
          + // count window
          + val groupingKeys = grouping.indices.toArray
          + if (groupingKeys.length > 0) {
          + // grouped aggregation
          + mappedInput.asInstanceOf[DataSet[Row]]
          + .groupBy(groupingKeys: _*)
          + // sort on time field, it's the one after grouping keys
          + .sortGroup(groupingKeys.length, Order.ASCENDING)
          — End diff –

          Good catch ! It should be the last one in the Row.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r95726648 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.dataset + +import org.apache.calcite.plan. {RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel. {RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.typeutils. {ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.plan.nodes.FlinkAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil. {CalcitePair, _} +import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.table.typeutils.TypeConverter +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ + +/** + * Flink RelNode which matches along with a LogicalWindowAggregate. + */ +class DataSetWindowAggregate( + window: LogicalWindow, + namedProperties: Seq [NamedWindowProperty] , + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputNode: RelNode, + namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + rowRelDataType: RelDataType, + inputType: RelDataType, + grouping: Array [Int] ) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List [RelNode] ): RelNode = { + new DataSetWindowAggregate( + window, + namedProperties, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { + s"Aggregate(${ + if (!grouping.isEmpty) { + s"groupBy: ($ {groupingToString(inputType, grouping)} ), " + } else { + "" + } + }window: ($window), " + + s"select: ($ { + aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties) + } ))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item( + "select", aggregationToString( + inputType, + grouping, + getRowType, + namedAggregates, + namedProperties)) + } + + override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + val aggCnt = this.namedAggregates.size + planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) + } + + override def translateToPlan( + tableEnv: BatchTableEnvironment, + expectedType: Option[TypeInformation [Any] ]): DataSet [Any] = { + + val config = tableEnv.getConfig + + val inputDS = getInput.asInstanceOf [DataSetRel] .translateToPlan( + tableEnv, + // tell the input operator that this operator currently only supports Rows as input + Some(TypeConverter.DEFAULT_ROW_TYPE)) + + val result = window match { + case EventTimeTumblingGroupWindow(_, _, size) => + createEventTimeTumblingWindowDataSet(inputDS, isTimeInterval(size.resultType)) + case EventTimeSessionGroupWindow(_, _, _) => + throw new UnsupportedOperationException( + "Event-time session windows on batch are currently not supported") + case EventTimeSlidingGroupWindow(_, _, _, _) => + throw new UnsupportedOperationException( + "Event-time sliding windows on batch are currently not supported") + case _: ProcessingTimeGroupWindow => + throw new UnsupportedOperationException( + "Processing-time tumbling windows are not supported on batch tables, " + + "window on batch must declare a time attribute over which the query is evaluated.") + } + + // if the expected type is not a Row, inject a mapper to convert to the expected type + expectedType match { + case Some(typeInfo) if typeInfo.getTypeClass != classOf [Row] => + val mapName = s"convert: ($ {getRowType.getFieldNames.toList.mkString(", ")} )" + result.map( + getConversionMapper( + config = config, + nullableInput = false, + inputType = resultRowTypeInfo.asInstanceOf[TypeInformation [Any] ], + expectedType = expectedType.get, + conversionOperatorName = "DataSetWindowAggregateConversion", + fieldNames = getRowType.getFieldNames + )) + .name(mapName) + case _ => result + } + } + + + private def createEventTimeTumblingWindowDataSet( + inputDS: DataSet [Any] , + isTimeWindow: Boolean) + : DataSet [Any] = { + val mapFunction = createDataSetWindowPrepareMapFunction( + window, + namedAggregates, + grouping, + inputType) + val groupReduceFunction = createDataSetWindowAggGroupReduceFunction( + window, + namedAggregates, + inputType, + getRowType, + grouping, + namedProperties) + + val mappedInput = inputDS + .map(mapFunction) + .name(prepareOperatorName) + + if (isTimeWindow) { + // grouped time window aggregation + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType + // group by grouping keys and rowtime field (the last field in the row) + val groupingKeys = grouping.indices ++ Seq(mapReturnType.getArity - 1) + mappedInput.asInstanceOf[DataSet[Row]] + .groupBy(groupingKeys: _*) + .reduceGroup(groupReduceFunction) + .returns(resultRowTypeInfo) + .name(aggregateOperatorName) + .asInstanceOf[DataSet[Any]] + } else { + // count window + val groupingKeys = grouping.indices.toArray + if (groupingKeys.length > 0) { + // grouped aggregation + mappedInput.asInstanceOf[DataSet [Row] ] + .groupBy(groupingKeys: _*) + // sort on time field, it's the one after grouping keys + .sortGroup(groupingKeys.length, Order.ASCENDING) — End diff – Good catch ! It should be the last one in the Row.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2938#discussion_r95728469

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -591,5 +729,27 @@ object AggregateUtil

          { groupingOffsetMapping.toArray }

          +
          + private def getTimeFieldPosition(timeField: Expression, inputType: RelDataType): Int = {
          + timeField match {
          + case ResolvedFieldReference(name, resultType) =>
          + // get the RelDataType referenced by the time-field
          + val relDataType = inputType.getFieldList.filter(r => name.equals(r.getName))
          — End diff –

          We set `Lex.JAVA` as default lexical policy which is case-sensitive. I will change this logical to adapt case-sensitive or insensitive. So that it works when user customize lexical policy.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2938#discussion_r95728469 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -591,5 +729,27 @@ object AggregateUtil { groupingOffsetMapping.toArray } + + private def getTimeFieldPosition(timeField: Expression, inputType: RelDataType): Int = { + timeField match { + case ResolvedFieldReference(name, resultType) => + // get the RelDataType referenced by the time-field + val relDataType = inputType.getFieldList.filter(r => name.equals(r.getName)) — End diff – We set `Lex.JAVA` as default lexical policy which is case-sensitive. I will change this logical to adapt case-sensitive or insensitive. So that it works when user customize lexical policy.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/2938

          Hi @twalthr , thanks for your reviewing. I have addressed your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2938 Hi @twalthr , thanks for your reviewing. I have addressed your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2938

          Thanks for the update @wuchong. I will test it again and merge it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2938 Thanks for the update @wuchong. I will test it again and merge it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2938

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2938
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: 88576a0eebafde06fe63688d9c4c07ea67fb6266

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: 88576a0eebafde06fe63688d9c4c07ea67fb6266

            People

            • Assignee:
              jark Jark Wu
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development