Details

    • Type: Sub-task
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Add Session group-windows for batch tables as described in FLIP-11.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/2942

          FLINK-4693[tableApi] Add session group-windows for batch tables

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [×] General
          • The pull request references the related JIRA issue ("FLINK-4693 Add session group-windows for batch tables")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [×] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLIP11-Batch-FLINK-4693-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2942.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2942


          commit 7c57b8c8e52380f6b08dcc152a32d0e018e39cb0
          Author: Jincheng Sun <sunjincheng121@gmail.com>
          Date: 2016-12-01T09:04:44Z

          FLINK-4693[tableApi] Add session group-windows for batch tables


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/2942 FLINK-4693 [tableApi] Add session group-windows for batch tables Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" FLINK-4693 Add session group-windows for batch tables") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLIP11-Batch- FLINK-4693 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2942.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2942 commit 7c57b8c8e52380f6b08dcc152a32d0e018e39cb0 Author: Jincheng Sun <sunjincheng121@gmail.com> Date: 2016-12-01T09:04:44Z FLINK-4693 [tableApi] Add session group-windows for batch tables
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 closed the pull request at:

          https://github.com/apache/flink/pull/2942

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 closed the pull request at: https://github.com/apache/flink/pull/2942
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/2983

          FLINK-4693[tableApi] Add session group-windows for batch tables

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [×] General
          • The pull request references the related JIRA issue ("FLINK-4693 Add session group-windows for batch tables")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [×] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLIP11-Batch-FLINK-4693-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2983.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2983


          commit 7c57b8c8e52380f6b08dcc152a32d0e018e39cb0
          Author: Jincheng Sun <sunjincheng121@gmail.com>
          Date: 2016-12-01T09:04:44Z

          FLINK-4693[tableApi] Add session group-windows for batch tables

          commit 2b9fb1e9948e9de78e8dbafdf2fc1a87c7614d45
          Author: Jincheng Sun <sunjincheng121@gmail.com>
          Date: 2016-12-10T02:12:25Z

          FLINK-4693[tableApi] Repair expiration methods.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/2983 FLINK-4693 [tableApi] Add session group-windows for batch tables Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" FLINK-4693 Add session group-windows for batch tables") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLIP11-Batch- FLINK-4693 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2983.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2983 commit 7c57b8c8e52380f6b08dcc152a32d0e018e39cb0 Author: Jincheng Sun <sunjincheng121@gmail.com> Date: 2016-12-01T09:04:44Z FLINK-4693 [tableApi] Add session group-windows for batch tables commit 2b9fb1e9948e9de78e8dbafdf2fc1a87c7614d45 Author: Jincheng Sun <sunjincheng121@gmail.com> Date: 2016-12-10T02:12:25Z FLINK-4693 [tableApi] Repair expiration methods.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user KurtYoung commented on the issue:

          https://github.com/apache/flink/pull/2983

          @sunjincheng121 Thanks for your work, can you rebase this to the latest master first, i will try to take a look then.

          Show
          githubbot ASF GitHub Bot added a comment - Github user KurtYoung commented on the issue: https://github.com/apache/flink/pull/2983 @sunjincheng121 Thanks for your work, can you rebase this to the latest master first, i will try to take a look then.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2983

          I think it would make sense to wait until #2938 is merged and rebase it on top of it.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2983 I think it would make sense to wait until #2938 is merged and rebase it on top of it.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/2983

          @twalthr that's right. Thanks @KurtYoung I'll rebase code until #2938 is merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/2983 @twalthr that's right. Thanks @KurtYoung I'll rebase code until #2938 is merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/2983

          @sunjincheng121 I have merged #2938. You can update this PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/2983 @sunjincheng121 I have merged #2938. You can update this PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 closed the pull request at:

          https://github.com/apache/flink/pull/2983

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 closed the pull request at: https://github.com/apache/flink/pull/2983
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3150

          FLINK-4693[tableApi] Add session group-windows for batch tables

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [×] General
          • The pull request references the related JIRA issue ("FLINK-4693[tableApi] Add session group-windows for batch tables")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [×] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-4693-AFTER4692

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3150.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3150


          commit ff7899caca76ee5ff8770c96b2185930b160cad5
          Author: Jincheng Sun <sunjincheng121@gmail.com>
          Date: 2017-01-18T04:34:34Z

          FLINK-4693[tableApi] Add session group-windows for batch tables


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3150 FLINK-4693 [tableApi] Add session group-windows for batch tables Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" FLINK-4693 [tableApi] Add session group-windows for batch tables") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-4693 -AFTER4692 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3150.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3150 commit ff7899caca76ee5ff8770c96b2185930b160cad5 Author: Jincheng Sun <sunjincheng121@gmail.com> Date: 2017-01-18T04:34:34Z FLINK-4693 [tableApi] Add session group-windows for batch tables
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3150

          @fhueske @twalthr I reopened the PR and propose the design doc.
          https://docs.google.com/document/d/1WIOKNwfTW8nMZsRVL-ANIzxIiuXqVNGxwjiPj7go7hg/edit

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3150 @fhueske @twalthr I reopened the PR and propose the design doc. https://docs.google.com/document/d/1WIOKNwfTW8nMZsRVL-ANIzxIiuXqVNGxwjiPj7go7hg/edit
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3150

          Thanks for updating the PR @sunjincheng121. I will shepherd this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3150 Thanks for updating the PR @sunjincheng121. I will shepherd this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96609935

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala —
          @@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase(
          val results = windowedTable.toDataSet[Row].collect()
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }
          +
          + @Test
          + def testEventTimeSessionGroupWindow(): Unit =

          { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val windowedTable = table + .groupBy('string) + .window(Session withGap 7.milli on 'long as 'w) + .select('string, 'string.count, 'w.start, 'w.end) + + val results = windowedTable.toDataSet[Row].collect() + + val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\nHello world,1," + + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\nHello world,1," + + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\nHello,3,1970-01-01 00:00:00.003," + + "1970-01-01 00:00:00.013\nHi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008" + TestBaseUtils.compareResultAsText(results.asJava, expected) + }

          +
          + @Test(expected = classOf[UnsupportedOperationException])
          + def testAlldEventTimeSessionGroupWindow(): Unit = {
          — End diff –

          Would it be much work to also support all event-time session group windows? We also support them in streaming we should also support in batch to avoid confusion.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96609935 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala — @@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase( val results = windowedTable.toDataSet [Row] .collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testEventTimeSessionGroupWindow(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val windowedTable = table + .groupBy('string) + .window(Session withGap 7.milli on 'long as 'w) + .select('string, 'string.count, 'w.start, 'w.end) + + val results = windowedTable.toDataSet[Row].collect() + + val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\nHello world,1," + + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\nHello world,1," + + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\nHello,3,1970-01-01 00:00:00.003," + + "1970-01-01 00:00:00.013\nHi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf [UnsupportedOperationException] ) + def testAlldEventTimeSessionGroupWindow(): Unit = { — End diff – Would it be much work to also support all event-time session group windows? We also support them in streaming we should also support in batch to avoid confusion.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96629778

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,164 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: Option[Long] = None
          + var currentWindowStart: Option[Long] = None
          +
          + records.foreach(
          + (record) => {
          + currentWindowStart =
          + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf[Long])
          + // initial traversal or opening a new window
          + if (lastWindowEnd.isEmpty ||
          + (lastWindowEnd.isDefined && currentWindowStart.get > lastWindowEnd.get)) {
          +
          + // calculate the current window and open a new window
          + if (lastWindowEnd.isDefined)

          { + + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + }

          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          + }
          +
          + aggregates.foreach(_.merge(record, aggregateBuffer))
          + last = record
          + lastWindowEnd = Some(getWindowEnd(last))
          + })
          +
          + doEvaluateAndCollect(out, last, head)
          +
          + }
          +
          + def doEvaluateAndCollect(
          + out: Collector[Row],
          + last: Row,
          + head: Row): Unit = {
          + // set group keys value to final output.
          + groupKeysMapping.foreach

          { + case (after, previous) => + output.setField(after, last.getField(previous)) + }

          +
          + // evaluate final aggregate value and set to output.
          + aggregateMapping.foreach

          { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + }

          +
          + // adds TimeWindow properties to output then emit output
          + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined)

          { + val start = + head.getField(intermediateRowWindowStartPos).asInstanceOf[Long] + val end = getWindowEnd(last) + + collector.wrappedCollector = out + collector.timeWindow = new TimeWindow(start, end) + + collector.collect(output) + }

          else

          { + out.collect(output) + }

          + }
          +
          + def getWindowEnd(record: Row): Long = {
          +
          + // when partial aggregate is not supported, the input data structure of reduce is
          + // |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
          — End diff –

          I think this comment should also be in the Scaladoc of this class. It is very hidden that this function supports two different input types.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96629778 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: Option [Long] = None + var currentWindowStart: Option [Long] = None + + records.foreach( + (record) => { + currentWindowStart = + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] ) + // initial traversal or opening a new window + if (lastWindowEnd.isEmpty || + (lastWindowEnd.isDefined && currentWindowStart.get > lastWindowEnd.get)) { + + // calculate the current window and open a new window + if (lastWindowEnd.isDefined) { + + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + } + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record + } + + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record + lastWindowEnd = Some(getWindowEnd(last)) + }) + + doEvaluateAndCollect(out, last, head) + + } + + def doEvaluateAndCollect( + out: Collector [Row] , + last: Row, + head: Row): Unit = { + // set group keys value to final output. + groupKeysMapping.foreach { + case (after, previous) => + output.setField(after, last.getField(previous)) + } + + // evaluate final aggregate value and set to output. + aggregateMapping.foreach { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + } + + // adds TimeWindow properties to output then emit output + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + val start = + head.getField(intermediateRowWindowStartPos).asInstanceOf[Long] + val end = getWindowEnd(last) + + collector.wrappedCollector = out + collector.timeWindow = new TimeWindow(start, end) + + collector.collect(output) + } else { + out.collect(output) + } + } + + def getWindowEnd(record: Row): Long = { + + // when partial aggregate is not supported, the input data structure of reduce is + // |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| — End diff – I think this comment should also be in the Scaladoc of this class. It is very hidden that this function supports two different input types.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96606158

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala —
          @@ -0,0 +1,133 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupCombineFunction
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +
          +/**
          + * This wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupingKeys
          + * @param intermediateRowArity The intermediate row field count.
          + * @param gap Session time window gap.
          + * @param intermediateRowType Intermediate row data type.
          + */
          +class DataSetSessionWindowAggregateCombineGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupingKeys: Array[Int],
          + intermediateRowArity: Int,
          + gap: Long,
          + @transient intermediateRowType: TypeInformation[Row])
          + extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var rowTimePos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + }

          +
          + /**
          + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time
          + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window)
          + * into an aggregate buffer.
          + *
          + * @param records Sub-grouped intermediate aggregate Rows .
          + * @return Combined intermediate aggregate Row.
          + *
          + */
          + override def combine(
          + records: Iterable[Row],
          + out: Collector[Row]): Unit = {
          +
          + var head:Row = null
          + var lastRowTime: Option[Long] = None
          + var currentRowTime: Option[Long] = None
          +
          + records.foreach(
          + (record) => {
          + currentRowTime = Some(record.getField(rowTimePos).asInstanceOf[Long])
          — End diff –

          We should not use Scala magic here. I don't know how `Option` is implemented but this `Some` could create objects for every record at runtime which is not very efficient. I recommend to use `null` here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96606158 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala — @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupCombineFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ + +/** + * This wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupCombineOperator] ]. + * + * @param aggregates The aggregate functions. + * @param groupingKeys + * @param intermediateRowArity The intermediate row field count. + * @param gap Session time window gap. + * @param intermediateRowType Intermediate row data type. + */ +class DataSetSessionWindowAggregateCombineGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupingKeys: Array [Int] , + intermediateRowArity: Int, + gap: Long, + @transient intermediateRowType: TypeInformation [Row] ) + extends RichGroupCombineFunction [Row,Row] with ResultTypeQueryable [Row] { + + private var aggregateBuffer: Row = _ + private var rowTimePos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + } + + /** + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window) + * into an aggregate buffer. + * + * @param records Sub-grouped intermediate aggregate Rows . + * @return Combined intermediate aggregate Row. + * + */ + override def combine( + records: Iterable [Row] , + out: Collector [Row] ): Unit = { + + var head:Row = null + var lastRowTime: Option [Long] = None + var currentRowTime: Option [Long] = None + + records.foreach( + (record) => { + currentRowTime = Some(record.getField(rowTimePos).asInstanceOf [Long] ) — End diff – We should not use Scala magic here. I don't know how `Option` is implemented but this `Some` could create objects for every record at runtime which is not very efficient. I recommend to use `null` here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96601727

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -218,6 +216,84 @@ class DataSetWindowAggregate(
          }
          }

          + private[this] def createEventTimeSessionWindowDataSet(
          + inputDS: DataSet[Any],
          + isParserCaseSensitive: Boolean): DataSet[Any] = {
          +
          + val groupingKeys = grouping.indices.toArray
          + val rowTypeInfo = resultRowTypeInfo
          +
          + // grouping window
          + if (groupingKeys.length > 0) {
          + //create mapFunction for initializing the aggregations
          + val mapFunction = createDataSetWindowPrepareMapFunction(
          + window,
          + namedAggregates,
          + grouping,
          + inputType,isParserCaseSensitive)
          +
          + // create groupReduceFunction for calculating the aggregations
          + val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
          + window,
          + namedAggregates,
          + inputType,
          + rowRelDataType,
          + grouping,
          + namedProperties)
          +
          + val mappedInput =
          + inputDS
          + .map(mapFunction)
          + .name(prepareOperatorName)
          +
          + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
          +
          + // the position of the rowtime field in the intermediate result for map output
          + val rowTimeFilePos = mapReturnType.getArity - 1
          +
          + // gets the window-start and window-end position in the intermediate result.
          + val windowStartPos = rowTimeFilePos
          + val windowEndPos = windowStartPos + 1
          — End diff –

          I would move the window start/end pos into the incremental aggregation `if` branch. It is not used for the non-incremental part.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96601727 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -218,6 +216,84 @@ class DataSetWindowAggregate( } } + private [this] def createEventTimeSessionWindowDataSet( + inputDS: DataSet [Any] , + isParserCaseSensitive: Boolean): DataSet [Any] = { + + val groupingKeys = grouping.indices.toArray + val rowTypeInfo = resultRowTypeInfo + + // grouping window + if (groupingKeys.length > 0) { + //create mapFunction for initializing the aggregations + val mapFunction = createDataSetWindowPrepareMapFunction( + window, + namedAggregates, + grouping, + inputType,isParserCaseSensitive) + + // create groupReduceFunction for calculating the aggregations + val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties) + + val mappedInput = + inputDS + .map(mapFunction) + .name(prepareOperatorName) + + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable [Row] ].getProducedType + + // the position of the rowtime field in the intermediate result for map output + val rowTimeFilePos = mapReturnType.getArity - 1 + + // gets the window-start and window-end position in the intermediate result. + val windowStartPos = rowTimeFilePos + val windowEndPos = windowStartPos + 1 — End diff – I would move the window start/end pos into the incremental aggregation `if` branch. It is not used for the non-incremental part.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96609116

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,164 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: Option[Long] = None
          + var currentWindowStart: Option[Long] = None
          +
          + records.foreach(
          + (record) => {
          + currentWindowStart =
          + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf[Long])
          + // initial traversal or opening a new window
          + if (lastWindowEnd.isEmpty ||
          + (lastWindowEnd.isDefined && currentWindowStart.get > lastWindowEnd.get)) {
          +
          + // calculate the current window and open a new window
          + if (lastWindowEnd.isDefined)

          { + + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + }

          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          + }
          +
          + aggregates.foreach(_.merge(record, aggregateBuffer))
          + last = record
          + lastWindowEnd = Some(getWindowEnd(last))
          + })
          +
          + doEvaluateAndCollect(out, last, head)
          +
          + }
          +
          + def doEvaluateAndCollect(
          + out: Collector[Row],
          + last: Row,
          + head: Row): Unit = {
          + // set group keys value to final output.
          + groupKeysMapping.foreach

          { + case (after, previous) => + output.setField(after, last.getField(previous)) + }

          +
          + // evaluate final aggregate value and set to output.
          + aggregateMapping.foreach

          { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + }

          +
          + // adds TimeWindow properties to output then emit output
          + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
          + val start =
          + head.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
          + val end = getWindowEnd(last)
          +
          + collector.wrappedCollector = out
          + collector.timeWindow = new TimeWindow(start, end)
          — End diff –

          Maybe we should modify `TimeWindowPropertyCollector`. It is not very efficient to create an object in order to pass two long values.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96609116 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: Option [Long] = None + var currentWindowStart: Option [Long] = None + + records.foreach( + (record) => { + currentWindowStart = + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] ) + // initial traversal or opening a new window + if (lastWindowEnd.isEmpty || + (lastWindowEnd.isDefined && currentWindowStart.get > lastWindowEnd.get)) { + + // calculate the current window and open a new window + if (lastWindowEnd.isDefined) { + + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + } + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record + } + + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record + lastWindowEnd = Some(getWindowEnd(last)) + }) + + doEvaluateAndCollect(out, last, head) + + } + + def doEvaluateAndCollect( + out: Collector [Row] , + last: Row, + head: Row): Unit = { + // set group keys value to final output. + groupKeysMapping.foreach { + case (after, previous) => + output.setField(after, last.getField(previous)) + } + + // evaluate final aggregate value and set to output. + aggregateMapping.foreach { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + } + + // adds TimeWindow properties to output then emit output + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + val start = + head.getField(intermediateRowWindowStartPos).asInstanceOf [Long] + val end = getWindowEnd(last) + + collector.wrappedCollector = out + collector.timeWindow = new TimeWindow(start, end) — End diff – Maybe we should modify `TimeWindowPropertyCollector`. It is not very efficient to create an object in order to pass two long values.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96607999

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,164 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: Option[Long] = None
          + var currentWindowStart: Option[Long] = None
          +
          + records.foreach(
          + (record) => {
          + currentWindowStart =
          + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf[Long])
          — End diff –

          Same comments as in `DataSetSessionWindowAggregateCombineGroupFunction`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96607999 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: Option [Long] = None + var currentWindowStart: Option [Long] = None + + records.foreach( + (record) => { + currentWindowStart = + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] ) — End diff – Same comments as in `DataSetSessionWindowAggregateCombineGroupFunction`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96629837

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -226,27 +225,102 @@ object AggregateUtil {
          aggregates,
          groupingOffsetMapping,
          aggOffsetMapping,

          • intermediateRowArity,
            + intermediateRowArity + 1,// the addition one field is used to store time attribute
            outputType.getFieldCount)
            +
            + case EventTimeSessionGroupWindow(_, _, gap) =>
            + val (startPos, endPos) = if (isTimeWindow(window)) { + computeWindowStartEndPropertyPos(properties) + }

            else

            { + (None, None) + }

            + new DataSetSessionWindowAggregateReduceGroupFunction(
            + aggregates,
            + groupingOffsetMapping,
            + aggOffsetMapping,
            + // the addition two fields are used to store window-start and window-end attributes
            + intermediateRowArity + 2,
            + outputType.getFieldCount,
            + startPos,
            + endPos,
            + asLong(gap))
            case _ =>
            throw new UnsupportedOperationException(s"$window is currently not supported on batch")
            }
            }

          /**
          + * Create a [[org.apache.flink.api.common.functions.GroupCombineFunction]] that pre-aggregation
          + * for aggregates.
          + * The function returns intermediate aggregate values of all aggregate function which are
          + * organized by the following format:
          + *
          + * {{

          { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(row-time) + * | | | + * v v v + * +---------+---------+--------+--------+--------+--------+-----------+---------+ + * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +---------+---------+--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(row-time)) + * + * }

          }}
          + *
          + */
          +
          — End diff –

          Remove empty line.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96629837 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -226,27 +225,102 @@ object AggregateUtil { aggregates, groupingOffsetMapping, aggOffsetMapping, intermediateRowArity, + intermediateRowArity + 1,// the addition one field is used to store time attribute outputType.getFieldCount) + + case EventTimeSessionGroupWindow(_, _, gap) => + val (startPos, endPos) = if (isTimeWindow(window)) { + computeWindowStartEndPropertyPos(properties) + } else { + (None, None) + } + new DataSetSessionWindowAggregateReduceGroupFunction( + aggregates, + groupingOffsetMapping, + aggOffsetMapping, + // the addition two fields are used to store window-start and window-end attributes + intermediateRowArity + 2, + outputType.getFieldCount, + startPos, + endPos, + asLong(gap)) case _ => throw new UnsupportedOperationException(s"$window is currently not supported on batch") } } /** + * Create a [ [org.apache.flink.api.common.functions.GroupCombineFunction] ] that pre-aggregation + * for aggregates. + * The function returns intermediate aggregate values of all aggregate function which are + * organized by the following format: + * + * {{ { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(row-time) + * | | | + * v v v + * +---------+---------+--------+--------+--------+--------+-----------+---------+ + * |groupKey1|groupKey2| sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +---------+---------+--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(row-time)) + * + * } }} + * + */ + — End diff – Remove empty line.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96605812

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala —
          @@ -0,0 +1,133 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupCombineFunction
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +
          +/**
          + * This wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupingKeys
          + * @param intermediateRowArity The intermediate row field count.
          + * @param gap Session time window gap.
          + * @param intermediateRowType Intermediate row data type.
          + */
          +class DataSetSessionWindowAggregateCombineGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupingKeys: Array[Int],
          + intermediateRowArity: Int,
          + gap: Long,
          + @transient intermediateRowType: TypeInformation[Row])
          + extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var rowTimePos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + }

          +
          + /**
          + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time
          + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window)
          + * into an aggregate buffer.
          + *
          + * @param records Sub-grouped intermediate aggregate Rows .
          + * @return Combined intermediate aggregate Row.
          + *
          + */
          + override def combine(
          + records: Iterable[Row],
          + out: Collector[Row]): Unit = {
          +
          + var head:Row = null
          + var lastRowTime: Option[Long] = None
          + var currentRowTime: Option[Long] = None
          +
          + records.foreach(
          — End diff –

          We should avoid Scala magic in runtime classes as much as possible. Because we don't know the runtime behavior. Can you replace this by a while loop?

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96605812 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala — @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupCombineFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ + +/** + * This wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupCombineOperator] ]. + * + * @param aggregates The aggregate functions. + * @param groupingKeys + * @param intermediateRowArity The intermediate row field count. + * @param gap Session time window gap. + * @param intermediateRowType Intermediate row data type. + */ +class DataSetSessionWindowAggregateCombineGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupingKeys: Array [Int] , + intermediateRowArity: Int, + gap: Long, + @transient intermediateRowType: TypeInformation [Row] ) + extends RichGroupCombineFunction [Row,Row] with ResultTypeQueryable [Row] { + + private var aggregateBuffer: Row = _ + private var rowTimePos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + } + + /** + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window) + * into an aggregate buffer. + * + * @param records Sub-grouped intermediate aggregate Rows . + * @return Combined intermediate aggregate Row. + * + */ + override def combine( + records: Iterable [Row] , + out: Collector [Row] ): Unit = { + + var head:Row = null + var lastRowTime: Option [Long] = None + var currentRowTime: Option [Long] = None + + records.foreach( — End diff – We should avoid Scala magic in runtime classes as much as possible. Because we don't know the runtime behavior. Can you replace this by a while loop?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3150

          Btw it is not necessary to close and open a new PR after rebasing. You can simply perform a force push.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3150 Btw it is not necessary to close and open a new PR after rebasing. You can simply perform a force push.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96635999

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala —
          @@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase(
          val results = windowedTable.toDataSet[Row].collect()
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }
          +
          + @Test
          + def testEventTimeSessionGroupWindow(): Unit =

          { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val windowedTable = table + .groupBy('string) + .window(Session withGap 7.milli on 'long as 'w) + .select('string, 'string.count, 'w.start, 'w.end) + + val results = windowedTable.toDataSet[Row].collect() + + val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\nHello world,1," + + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\nHello world,1," + + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\nHello,3,1970-01-01 00:00:00.003," + + "1970-01-01 00:00:00.013\nHi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008" + TestBaseUtils.compareResultAsText(results.asJava, expected) + }

          +
          + @Test(expected = classOf[UnsupportedOperationException])
          + def testAlldEventTimeSessionGroupWindow(): Unit = {
          — End diff –

          Yes, I have discussed this with fabian, and we will add non-grouped session windows for batch tables in FLINK-5219(https://issues.apache.org/jira/browse/FLINK-5219).

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96635999 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala — @@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase( val results = windowedTable.toDataSet [Row] .collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testEventTimeSessionGroupWindow(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val windowedTable = table + .groupBy('string) + .window(Session withGap 7.milli on 'long as 'w) + .select('string, 'string.count, 'w.start, 'w.end) + + val results = windowedTable.toDataSet[Row].collect() + + val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\nHello world,1," + + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\nHello world,1," + + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\nHello,3,1970-01-01 00:00:00.003," + + "1970-01-01 00:00:00.013\nHi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf [UnsupportedOperationException] ) + def testAlldEventTimeSessionGroupWindow(): Unit = { — End diff – Yes, I have discussed this with fabian, and we will add non-grouped session windows for batch tables in FLINK-5219 ( https://issues.apache.org/jira/browse/FLINK-5219 ).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96642258

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala —
          @@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase(
          val results = windowedTable.toDataSet[Row].collect()
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }
          +
          + @Test
          + def testEventTimeSessionGroupWindow(): Unit =

          { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val windowedTable = table + .groupBy('string) + .window(Session withGap 7.milli on 'long as 'w) + .select('string, 'string.count, 'w.start, 'w.end) + + val results = windowedTable.toDataSet[Row].collect() + + val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\nHello world,1," + + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\nHello world,1," + + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\nHello,3,1970-01-01 00:00:00.003," + + "1970-01-01 00:00:00.013\nHi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008" + TestBaseUtils.compareResultAsText(results.asJava, expected) + }

          +
          + @Test(expected = classOf[UnsupportedOperationException])
          + def testAlldEventTimeSessionGroupWindow(): Unit = {
          — End diff –

          I see. I haven't seen this issue.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96642258 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala — @@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase( val results = windowedTable.toDataSet [Row] .collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testEventTimeSessionGroupWindow(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val windowedTable = table + .groupBy('string) + .window(Session withGap 7.milli on 'long as 'w) + .select('string, 'string.count, 'w.start, 'w.end) + + val results = windowedTable.toDataSet[Row].collect() + + val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\nHello world,1," + + "1970-01-01 00:00:00.008,1970-01-01 00:00:00.015\nHello world,1," + + "1970-01-01 00:00:00.016,1970-01-01 00:00:00.023\nHello,3,1970-01-01 00:00:00.003," + + "1970-01-01 00:00:00.013\nHi,1,1970-01-01 00:00:00.001,1970-01-01 00:00:00.008" + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf [UnsupportedOperationException] ) + def testAlldEventTimeSessionGroupWindow(): Unit = { — End diff – I see. I haven't seen this issue.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96645235

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,164 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: Option[Long] = None
          + var currentWindowStart: Option[Long] = None
          +
          + records.foreach(
          + (record) => {
          + currentWindowStart =
          + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf[Long])
          + // initial traversal or opening a new window
          + if (lastWindowEnd.isEmpty ||
          + (lastWindowEnd.isDefined && currentWindowStart.get > lastWindowEnd.get)) {
          +
          + // calculate the current window and open a new window
          + if (lastWindowEnd.isDefined)

          { + + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + }

          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          + }
          +
          + aggregates.foreach(_.merge(record, aggregateBuffer))
          + last = record
          + lastWindowEnd = Some(getWindowEnd(last))
          + })
          +
          + doEvaluateAndCollect(out, last, head)
          +
          + }
          +
          + def doEvaluateAndCollect(
          + out: Collector[Row],
          + last: Row,
          + head: Row): Unit = {
          + // set group keys value to final output.
          + groupKeysMapping.foreach

          { + case (after, previous) => + output.setField(after, last.getField(previous)) + }

          +
          + // evaluate final aggregate value and set to output.
          + aggregateMapping.foreach

          { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + }

          +
          + // adds TimeWindow properties to output then emit output
          + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
          + val start =
          + head.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
          + val end = getWindowEnd(last)
          +
          + collector.wrappedCollector = out
          + collector.timeWindow = new TimeWindow(start, end)
          — End diff –

          *You are right, maybe we can defined `TimeWindowPropertyCollector` as follow:*
          class TimeWindowPropertyCollector(xxx) extends Collector[Row]

          { ... // var timeWindow: TimeWindow = _ var windowStart:Long = _ var windowEnd:Long = _ ... }

          *What do you think ?*

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96645235 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: Option [Long] = None + var currentWindowStart: Option [Long] = None + + records.foreach( + (record) => { + currentWindowStart = + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] ) + // initial traversal or opening a new window + if (lastWindowEnd.isEmpty || + (lastWindowEnd.isDefined && currentWindowStart.get > lastWindowEnd.get)) { + + // calculate the current window and open a new window + if (lastWindowEnd.isDefined) { + + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + } + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record + } + + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record + lastWindowEnd = Some(getWindowEnd(last)) + }) + + doEvaluateAndCollect(out, last, head) + + } + + def doEvaluateAndCollect( + out: Collector [Row] , + last: Row, + head: Row): Unit = { + // set group keys value to final output. + groupKeysMapping.foreach { + case (after, previous) => + output.setField(after, last.getField(previous)) + } + + // evaluate final aggregate value and set to output. + aggregateMapping.foreach { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + } + + // adds TimeWindow properties to output then emit output + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + val start = + head.getField(intermediateRowWindowStartPos).asInstanceOf [Long] + val end = getWindowEnd(last) + + collector.wrappedCollector = out + collector.timeWindow = new TimeWindow(start, end) — End diff – * You are right, maybe we can defined `TimeWindowPropertyCollector` as follow: * class TimeWindowPropertyCollector(xxx) extends Collector [Row] { ... // var timeWindow: TimeWindow = _ var windowStart:Long = _ var windowEnd:Long = _ ... } * What do you think ? *
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96645923

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,164 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: Option[Long] = None
          + var currentWindowStart: Option[Long] = None
          +
          + records.foreach(
          + (record) => {
          + currentWindowStart =
          + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf[Long])
          + // initial traversal or opening a new window
          + if (lastWindowEnd.isEmpty ||
          + (lastWindowEnd.isDefined && currentWindowStart.get > lastWindowEnd.get)) {
          +
          + // calculate the current window and open a new window
          + if (lastWindowEnd.isDefined)

          { + + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + }

          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          + }
          +
          + aggregates.foreach(_.merge(record, aggregateBuffer))
          + last = record
          + lastWindowEnd = Some(getWindowEnd(last))
          + })
          +
          + doEvaluateAndCollect(out, last, head)
          +
          + }
          +
          + def doEvaluateAndCollect(
          + out: Collector[Row],
          + last: Row,
          + head: Row): Unit = {
          + // set group keys value to final output.
          + groupKeysMapping.foreach

          { + case (after, previous) => + output.setField(after, last.getField(previous)) + }

          +
          + // evaluate final aggregate value and set to output.
          + aggregateMapping.foreach

          { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + }

          +
          + // adds TimeWindow properties to output then emit output
          + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
          + val start =
          + head.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
          + val end = getWindowEnd(last)
          +
          + collector.wrappedCollector = out
          + collector.timeWindow = new TimeWindow(start, end)
          — End diff –

          Yes, I think this is nicer.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96645923 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: Option [Long] = None + var currentWindowStart: Option [Long] = None + + records.foreach( + (record) => { + currentWindowStart = + Some(record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] ) + // initial traversal or opening a new window + if (lastWindowEnd.isEmpty || + (lastWindowEnd.isDefined && currentWindowStart.get > lastWindowEnd.get)) { + + // calculate the current window and open a new window + if (lastWindowEnd.isDefined) { + + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + } + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record + } + + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record + lastWindowEnd = Some(getWindowEnd(last)) + }) + + doEvaluateAndCollect(out, last, head) + + } + + def doEvaluateAndCollect( + out: Collector [Row] , + last: Row, + head: Row): Unit = { + // set group keys value to final output. + groupKeysMapping.foreach { + case (after, previous) => + output.setField(after, last.getField(previous)) + } + + // evaluate final aggregate value and set to output. + aggregateMapping.foreach { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + } + + // adds TimeWindow properties to output then emit output + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + val start = + head.getField(intermediateRowWindowStartPos).asInstanceOf [Long] + val end = getWindowEnd(last) + + collector.wrappedCollector = out + collector.timeWindow = new TimeWindow(start, end) — End diff – Yes, I think this is nicer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3150

          Hi, @twalthr thanks a lot for the review. I have updated the PR according to your comments. Let me know if I miss something,thanks again!!

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3150 Hi, @twalthr thanks a lot for the review. I have updated the PR according to your comments. Let me know if I miss something,thanks again!!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3150

          Thanks for updating the PR. +1 to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3150 Thanks for updating the PR. +1 to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3150

          The PR looks good to me, +1 to merge

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3150 The PR looks good to me, +1 to merge
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96810332

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -128,10 +128,8 @@ class DataSetWindowAggregate(
          inputDS,
          isTimeInterval(size.resultType),
          caseSensitive)
          -

          • case EventTimeSessionGroupWindow(_, _, _) =>
          • throw new UnsupportedOperationException(
          • "Event-time session windows in a batch environment are currently not supported")
            + case EventTimeSessionGroupWindow(_, _, gap) =>
            + createEventTimeSessionWindowDataSet(inputDS,caseSensitive)
              • End diff –

          add space: `(inputDS, caseSensitive)`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96810332 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -128,10 +128,8 @@ class DataSetWindowAggregate( inputDS, isTimeInterval(size.resultType), caseSensitive) - case EventTimeSessionGroupWindow(_, _, _) => throw new UnsupportedOperationException( "Event-time session windows in a batch environment are currently not supported") + case EventTimeSessionGroupWindow(_, _, gap) => + createEventTimeSessionWindowDataSet(inputDS,caseSensitive) End diff – add space: `(inputDS, caseSensitive)`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96810423

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -218,6 +216,85 @@ class DataSetWindowAggregate(
          }
          }

          + private[this] def createEventTimeSessionWindowDataSet(
          + inputDS: DataSet[Any],
          + isParserCaseSensitive: Boolean): DataSet[Any] = {
          +
          + val groupingKeys = grouping.indices.toArray
          + val rowTypeInfo = resultRowTypeInfo
          +
          + // grouping window
          + if (groupingKeys.length > 0) {
          + //create mapFunction for initializing the aggregations
          + val mapFunction = createDataSetWindowPrepareMapFunction(
          + window,
          + namedAggregates,
          + grouping,
          + inputType,isParserCaseSensitive)
          — End diff –

          wrap last argument as well

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96810423 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -218,6 +216,85 @@ class DataSetWindowAggregate( } } + private [this] def createEventTimeSessionWindowDataSet( + inputDS: DataSet [Any] , + isParserCaseSensitive: Boolean): DataSet [Any] = { + + val groupingKeys = grouping.indices.toArray + val rowTypeInfo = resultRowTypeInfo + + // grouping window + if (groupingKeys.length > 0) { + //create mapFunction for initializing the aggregations + val mapFunction = createDataSetWindowPrepareMapFunction( + window, + namedAggregates, + grouping, + inputType,isParserCaseSensitive) — End diff – wrap last argument as well
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96817585

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala —
          @@ -0,0 +1,135 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupCombineFunction
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +
          +/**
          + * This wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupingKeys
          + * @param intermediateRowArity The intermediate row field count.
          + * @param gap Session time window gap.
          + * @param intermediateRowType Intermediate row data type.
          + */
          +class DataSetSessionWindowAggregateCombineGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupingKeys: Array[Int],
          + intermediateRowArity: Int,
          + gap: Long,
          + @transient intermediateRowType: TypeInformation[Row])
          + extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var rowTimePos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + }

          +
          + /**
          + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time
          + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window)
          + * into an aggregate buffer.
          + *
          + * @param records Sub-grouped intermediate aggregate Rows .
          + * @return Combined intermediate aggregate Row.
          + *
          + */
          + override def combine(
          + records: Iterable[Row],
          + out: Collector[Row]): Unit = {
          +
          + var head:Row = null
          + var lastRowTime: java.lang.Long = null
          + var currentRowTime: java.lang.Long = null
          +
          + val iterator = records.iterator()
          +
          + while (iterator.hasNext) {
          + val record = iterator.next()
          + currentRowTime = record.getField(rowTimePos).asInstanceOf[Long]
          +
          + // initial traversal or opening a new window
          + // the session window end is equal to last row-time + gap .
          + if (null == lastRowTime ||
          + (null != lastRowTime && (currentRowTime > (lastRowTime + gap)))) {
          +
          + // calculate the current window and open a new window.
          + if (null != lastRowTime)

          { + // emit the current window's merged data + doCollect(out, head, lastRowTime) + }

          else {
          + // set group keys to aggregateBuffer.
          + for (i <- 0 until groupingKeys.length)

          { + aggregateBuffer.setField(i, record.getField(i)) + }

          + }
          +
          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          — End diff –

          Do not remember an object that you received from the combine (or reduce) iterator. The iterator may repeatedly serve the same mutable object. If we only need the timestamp, we should remember only this in a `long`. Or we set the start time immediately in the `aggregateBuffer`.

          See also object reuse mode: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96817585 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala — @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupCombineFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ + +/** + * This wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupCombineOperator] ]. + * + * @param aggregates The aggregate functions. + * @param groupingKeys + * @param intermediateRowArity The intermediate row field count. + * @param gap Session time window gap. + * @param intermediateRowType Intermediate row data type. + */ +class DataSetSessionWindowAggregateCombineGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupingKeys: Array [Int] , + intermediateRowArity: Int, + gap: Long, + @transient intermediateRowType: TypeInformation [Row] ) + extends RichGroupCombineFunction [Row,Row] with ResultTypeQueryable [Row] { + + private var aggregateBuffer: Row = _ + private var rowTimePos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + } + + /** + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window) + * into an aggregate buffer. + * + * @param records Sub-grouped intermediate aggregate Rows . + * @return Combined intermediate aggregate Row. + * + */ + override def combine( + records: Iterable [Row] , + out: Collector [Row] ): Unit = { + + var head:Row = null + var lastRowTime: java.lang.Long = null + var currentRowTime: java.lang.Long = null + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + currentRowTime = record.getField(rowTimePos).asInstanceOf [Long] + + // initial traversal or opening a new window + // the session window end is equal to last row-time + gap . + if (null == lastRowTime || + (null != lastRowTime && (currentRowTime > (lastRowTime + gap)))) { + + // calculate the current window and open a new window. + if (null != lastRowTime) { + // emit the current window's merged data + doCollect(out, head, lastRowTime) + } else { + // set group keys to aggregateBuffer. + for (i <- 0 until groupingKeys.length) { + aggregateBuffer.setField(i, record.getField(i)) + } + } + + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record — End diff – Do not remember an object that you received from the combine (or reduce) iterator. The iterator may repeatedly serve the same mutable object. If we only need the timestamp, we should remember only this in a `long`. Or we set the start time immediately in the `aggregateBuffer`. See also object reuse mode: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96811063

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -226,27 +225,101 @@ object AggregateUtil {
          aggregates,
          groupingOffsetMapping,
          aggOffsetMapping,

          • intermediateRowArity,
            + intermediateRowArity + 1,// the addition one field is used to store time attribute
            outputType.getFieldCount)
            +
            + case EventTimeSessionGroupWindow(_, _, gap) =>
            + val (startPos, endPos) = if (isTimeWindow(window)) {
              • End diff –

          Aren't session windows always time windows?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96811063 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -226,27 +225,101 @@ object AggregateUtil { aggregates, groupingOffsetMapping, aggOffsetMapping, intermediateRowArity, + intermediateRowArity + 1,// the addition one field is used to store time attribute outputType.getFieldCount) + + case EventTimeSessionGroupWindow(_, _, gap) => + val (startPos, endPos) = if (isTimeWindow(window)) { End diff – Aren't session windows always time windows?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96812620

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala —
          @@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase(
          val results = windowedTable.toDataSet[Row].collect()
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }
          +
          + @Test
          + def testEventTimeSessionGroupWindow(): Unit = {
          + val env = ExecutionEnvironment.getExecutionEnvironment
          + val tEnv = TableEnvironment.getTableEnvironment(env, config)
          +
          + val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
          + val windowedTable = table
          + .groupBy('string)
          + .window(Session withGap 7.milli on 'long as 'w)
          + .select('string, 'string.count, 'w.start, 'w.end)
          +
          + val results = windowedTable.toDataSet[Row].collect()
          +
          + val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\nHello world,1," +
          — End diff –

          can you break the lines at the end of the records? Makes it easier to read the expected data.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96812620 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala — @@ -119,4 +119,35 @@ class DataSetWindowAggregateITCase( val results = windowedTable.toDataSet [Row] .collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testEventTimeSessionGroupWindow(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) + val windowedTable = table + .groupBy('string) + .window(Session withGap 7.milli on 'long as 'w) + .select('string, 'string.count, 'w.start, 'w.end) + + val results = windowedTable.toDataSet [Row] .collect() + + val expected = "Hallo,1,1970-01-01 00:00:00.002,1970-01-01 00:00:00.009\nHello world,1," + — End diff – can you break the lines at the end of the records? Makes it easier to read the expected data.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96815166

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala —
          @@ -0,0 +1,135 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupCombineFunction
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +
          +/**
          + * This wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupingKeys
          + * @param intermediateRowArity The intermediate row field count.
          + * @param gap Session time window gap.
          + * @param intermediateRowType Intermediate row data type.
          + */
          +class DataSetSessionWindowAggregateCombineGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupingKeys: Array[Int],
          + intermediateRowArity: Int,
          + gap: Long,
          + @transient intermediateRowType: TypeInformation[Row])
          + extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var rowTimePos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + }

          +
          + /**
          + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time
          + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window)
          + * into an aggregate buffer.
          + *
          + * @param records Sub-grouped intermediate aggregate Rows .
          + * @return Combined intermediate aggregate Row.
          + *
          + */
          + override def combine(
          + records: Iterable[Row],
          + out: Collector[Row]): Unit = {
          +
          + var head:Row = null
          + var lastRowTime: java.lang.Long = null
          + var currentRowTime: java.lang.Long = null
          +
          + val iterator = records.iterator()
          +
          + while (iterator.hasNext) {
          + val record = iterator.next()
          + currentRowTime = record.getField(rowTimePos).asInstanceOf[Long]
          — End diff –

          wrong indention?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96815166 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala — @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupCombineFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ + +/** + * This wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupCombineOperator] ]. + * + * @param aggregates The aggregate functions. + * @param groupingKeys + * @param intermediateRowArity The intermediate row field count. + * @param gap Session time window gap. + * @param intermediateRowType Intermediate row data type. + */ +class DataSetSessionWindowAggregateCombineGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupingKeys: Array [Int] , + intermediateRowArity: Int, + gap: Long, + @transient intermediateRowType: TypeInformation [Row] ) + extends RichGroupCombineFunction [Row,Row] with ResultTypeQueryable [Row] { + + private var aggregateBuffer: Row = _ + private var rowTimePos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + } + + /** + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window) + * into an aggregate buffer. + * + * @param records Sub-grouped intermediate aggregate Rows . + * @return Combined intermediate aggregate Row. + * + */ + override def combine( + records: Iterable [Row] , + out: Collector [Row] ): Unit = { + + var head:Row = null + var lastRowTime: java.lang.Long = null + var currentRowTime: java.lang.Long = null + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + currentRowTime = record.getField(rowTimePos).asInstanceOf [Long] — End diff – wrong indention?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96818876

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,179 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + * Note:
          + * This can handle two input types:
          + * 1. when partial aggregate is not supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
          + * 2. when partial aggregate is supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: java.lang.Long = null
          + var currentWindowStart:java.lang.Long = null
          +
          + val iterator = records.iterator()
          +
          + while (iterator.hasNext) {
          + val record = iterator.next()
          + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
          + // initial traversal or opening a new window
          + if (null == lastWindowEnd ||
          + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) {
          +
          + // calculate the current window and open a new window
          + if (null != lastWindowEnd)

          { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + }

          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          + }
          +
          + aggregates.foreach(_.merge(record, aggregateBuffer))
          + last = record
          — End diff –

          We cannot remember objects fetched from the iterator (see above).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96818876 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * Note: + * This can handle two input types: + * 1. when partial aggregate is not supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| + * 2. when partial aggregate is supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd| + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: java.lang.Long = null + var currentWindowStart:java.lang.Long = null + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] + // initial traversal or opening a new window + if (null == lastWindowEnd || + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) { + + // calculate the current window and open a new window + if (null != lastWindowEnd) { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + } + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record + } + + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record — End diff – We cannot remember objects fetched from the iterator (see above).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96818855

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,179 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + * Note:
          + * This can handle two input types:
          + * 1. when partial aggregate is not supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
          + * 2. when partial aggregate is supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: java.lang.Long = null
          + var currentWindowStart:java.lang.Long = null
          +
          + val iterator = records.iterator()
          +
          + while (iterator.hasNext) {
          + val record = iterator.next()
          + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
          + // initial traversal or opening a new window
          + if (null == lastWindowEnd ||
          + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) {
          +
          + // calculate the current window and open a new window
          + if (null != lastWindowEnd)

          { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + }

          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          — End diff –

          We cannot remember objects fetched from the iterator (see above).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96818855 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * Note: + * This can handle two input types: + * 1. when partial aggregate is not supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| + * 2. when partial aggregate is supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd| + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: java.lang.Long = null + var currentWindowStart:java.lang.Long = null + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] + // initial traversal or opening a new window + if (null == lastWindowEnd || + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) { + + // calculate the current window and open a new window + if (null != lastWindowEnd) { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + } + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record — End diff – We cannot remember objects fetched from the iterator (see above).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96820389

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,179 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + * Note:
          + * This can handle two input types:
          + * 1. when partial aggregate is not supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
          + * 2. when partial aggregate is supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: java.lang.Long = null
          + var currentWindowStart:java.lang.Long = null
          +
          + val iterator = records.iterator()
          +
          + while (iterator.hasNext) {
          + val record = iterator.next()
          + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
          + // initial traversal or opening a new window
          + if (null == lastWindowEnd ||
          + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) {
          +
          + // calculate the current window and open a new window
          + if (null != lastWindowEnd)

          { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + }

          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          + }
          +
          + aggregates.foreach(_.merge(record, aggregateBuffer))
          + last = record
          + lastWindowEnd = getWindowEnd(last)
          + }
          +
          + doEvaluateAndCollect(out, last, head)
          +
          + }
          +
          + def doEvaluateAndCollect(
          + out: Collector[Row],
          + last: Row,
          + head: Row): Unit = {
          + // set group keys value to final output.
          + groupKeysMapping.foreach

          { + case (after, previous) => + output.setField(after, last.getField(previous)) + }

          +
          + // evaluate final aggregate value and set to output.
          + aggregateMapping.foreach

          { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + }

          +
          + // adds TimeWindow properties to output then emit output
          + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined)

          { + val start = + head.getField(intermediateRowWindowStartPos).asInstanceOf[Long] + val end = getWindowEnd(last) + + collector.wrappedCollector = out + collector.windowStart = start + collector.windowEnd = end + + collector.collect(output) + }

          else

          { + out.collect(output) + }

          + }
          +
          + /**
          + * when partial aggregate is not supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
          + * when partial aggregate is supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
          + *
          + * @param record The last record in the window
          + * @return window-end time.
          + */
          + def getWindowEnd(record: Row): Long = {
          + // when partial aggregate is not supported, the input data structure of reduce is
          + // |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
          + if (record.getArity == intermediateRowWindowEndPos) {
          — End diff –

          Add a `final boolean` field to the class that indicates whether the input was combined or not. This makes the evaluation of the condition more efficient during execution.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96820389 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * Note: + * This can handle two input types: + * 1. when partial aggregate is not supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| + * 2. when partial aggregate is supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd| + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: java.lang.Long = null + var currentWindowStart:java.lang.Long = null + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] + // initial traversal or opening a new window + if (null == lastWindowEnd || + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) { + + // calculate the current window and open a new window + if (null != lastWindowEnd) { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + } + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record + } + + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record + lastWindowEnd = getWindowEnd(last) + } + + doEvaluateAndCollect(out, last, head) + + } + + def doEvaluateAndCollect( + out: Collector [Row] , + last: Row, + head: Row): Unit = { + // set group keys value to final output. + groupKeysMapping.foreach { + case (after, previous) => + output.setField(after, last.getField(previous)) + } + + // evaluate final aggregate value and set to output. + aggregateMapping.foreach { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + } + + // adds TimeWindow properties to output then emit output + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + val start = + head.getField(intermediateRowWindowStartPos).asInstanceOf[Long] + val end = getWindowEnd(last) + + collector.wrappedCollector = out + collector.windowStart = start + collector.windowEnd = end + + collector.collect(output) + } else { + out.collect(output) + } + } + + /** + * when partial aggregate is not supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| + * when partial aggregate is supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd| + * + * @param record The last record in the window + * @return window-end time. + */ + def getWindowEnd(record: Row): Long = { + // when partial aggregate is not supported, the input data structure of reduce is + // |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| + if (record.getArity == intermediateRowWindowEndPos) { — End diff – Add a `final boolean` field to the class that indicates whether the input was combined or not. This makes the evaluation of the condition more efficient during execution.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96816754

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala —
          @@ -0,0 +1,135 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupCombineFunction
          +import org.apache.flink.api.common.typeinfo.TypeInformation
          +import org.apache.flink.api.java.typeutils.ResultTypeQueryable
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +
          +/**
          + * This wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupingKeys
          + * @param intermediateRowArity The intermediate row field count.
          + * @param gap Session time window gap.
          + * @param intermediateRowType Intermediate row data type.
          + */
          +class DataSetSessionWindowAggregateCombineGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupingKeys: Array[Int],
          + intermediateRowArity: Int,
          + gap: Long,
          + @transient intermediateRowType: TypeInformation[Row])
          + extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var rowTimePos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + }

          +
          + /**
          + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time
          + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window)
          + * into an aggregate buffer.
          + *
          + * @param records Sub-grouped intermediate aggregate Rows .
          + * @return Combined intermediate aggregate Row.
          + *
          + */
          + override def combine(
          + records: Iterable[Row],
          + out: Collector[Row]): Unit = {
          +
          + var head:Row = null
          — End diff –

          add space

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96816754 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateCombineGroupFunction.scala — @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupCombineFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ + +/** + * This wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupCombineOperator] ]. + * + * @param aggregates The aggregate functions. + * @param groupingKeys + * @param intermediateRowArity The intermediate row field count. + * @param gap Session time window gap. + * @param intermediateRowType Intermediate row data type. + */ +class DataSetSessionWindowAggregateCombineGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupingKeys: Array [Int] , + intermediateRowArity: Int, + gap: Long, + @transient intermediateRowType: TypeInformation [Row] ) + extends RichGroupCombineFunction [Row,Row] with ResultTypeQueryable [Row] { + + private var aggregateBuffer: Row = _ + private var rowTimePos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupingKeys) + aggregateBuffer = new Row(intermediateRowArity) + rowTimePos = intermediateRowArity - 2 + } + + /** + * For sub-grouped intermediate aggregate Rows, divide window based on the row-time + * (current'row-time - previous’row-time > gap), and then merge data (within a unified window) + * into an aggregate buffer. + * + * @param records Sub-grouped intermediate aggregate Rows . + * @return Combined intermediate aggregate Row. + * + */ + override def combine( + records: Iterable [Row] , + out: Collector [Row] ): Unit = { + + var head:Row = null — End diff – add space
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96820016

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,179 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + * Note:
          + * This can handle two input types:
          + * 1. when partial aggregate is not supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
          + * 2. when partial aggregate is supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: java.lang.Long = null
          + var currentWindowStart:java.lang.Long = null
          +
          + val iterator = records.iterator()
          +
          + while (iterator.hasNext) {
          + val record = iterator.next()
          + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
          + // initial traversal or opening a new window
          + if (null == lastWindowEnd ||
          + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) {
          +
          + // calculate the current window and open a new window
          + if (null != lastWindowEnd)

          { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + }

          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          + }
          +
          + aggregates.foreach(_.merge(record, aggregateBuffer))
          + last = record
          + lastWindowEnd = getWindowEnd(last)
          + }
          +
          + doEvaluateAndCollect(out, last, head)
          +
          + }
          +
          + def doEvaluateAndCollect(
          + out: Collector[Row],
          + last: Row,
          + head: Row): Unit = {
          + // set group keys value to final output.
          + groupKeysMapping.foreach

          { + case (after, previous) => + output.setField(after, last.getField(previous)) + }

          +
          + // evaluate final aggregate value and set to output.
          + aggregateMapping.foreach

          { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + }

          +
          + // adds TimeWindow properties to output then emit output
          + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) {
          + val start =
          — End diff –

          store `start` and `end` in `reduce()` when iterating over the records.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96820016 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * Note: + * This can handle two input types: + * 1. when partial aggregate is not supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| + * 2. when partial aggregate is supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd| + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: java.lang.Long = null + var currentWindowStart:java.lang.Long = null + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] + // initial traversal or opening a new window + if (null == lastWindowEnd || + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) { + + // calculate the current window and open a new window + if (null != lastWindowEnd) { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + } + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record + } + + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record + lastWindowEnd = getWindowEnd(last) + } + + doEvaluateAndCollect(out, last, head) + + } + + def doEvaluateAndCollect( + out: Collector [Row] , + last: Row, + head: Row): Unit = { + // set group keys value to final output. + groupKeysMapping.foreach { + case (after, previous) => + output.setField(after, last.getField(previous)) + } + + // evaluate final aggregate value and set to output. + aggregateMapping.foreach { + case (after, previous) => + output.setField(after, aggregates(previous).evaluate(aggregateBuffer)) + } + + // adds TimeWindow properties to output then emit output + if (finalRowWindowStartPos.isDefined || finalRowWindowEndPos.isDefined) { + val start = — End diff – store `start` and `end` in `reduce()` when iterating over the records.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96819808

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala —
          @@ -0,0 +1,179 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import java.lang.Iterable
          +
          +import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.types.Row
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +import scala.collection.JavaConversions._
          +import org.apache.flink.streaming.api.windowing.windows.TimeWindow
          +
          +/**
          + * It wraps the aggregate logic inside of
          + * [[org.apache.flink.api.java.operators.GroupReduceOperator]]. It is used for Session time-window
          + * on batch.
          + * Note:
          + * This can handle two input types:
          + * 1. when partial aggregate is not supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime|
          + * 2. when partial aggregate is supported, the input data structure of reduce is
          + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd|
          + *
          + * @param aggregates The aggregate functions.
          + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
          + * and output Row.
          + * @param aggregateMapping The index mapping between aggregate function list and aggregated value
          + * index in output Row.
          + * @param intermediateRowArity The intermediate row field count.
          + * @param finalRowArity The output row field count.
          + * @param finalRowWindowStartPos The relative window-start field position.
          + * @param finalRowWindowEndPos The relative window-end field position.
          + * @param gap Session time window gap.
          + */
          +class DataSetSessionWindowAggregateReduceGroupFunction(
          + aggregates: Array[Aggregate[_ <: Any]],
          + groupKeysMapping: Array[(Int, Int)],
          + aggregateMapping: Array[(Int, Int)],
          + intermediateRowArity: Int,
          + finalRowArity: Int,
          + finalRowWindowStartPos: Option[Int],
          + finalRowWindowEndPos: Option[Int],
          + gap:Long)
          + extends RichGroupReduceFunction[Row, Row] {
          +
          + private var aggregateBuffer: Row = _
          + private var output: Row = _
          + private var collector: TimeWindowPropertyCollector = _
          + private var intermediateRowWindowStartPos = 0
          + private var intermediateRowWindowEndPos = 0
          +
          + override def open(config: Configuration)

          { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + }

          +
          + /**
          + * For grouped intermediate aggregate Rows, divide window according to the window-start
          + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate
          + * aggregated values output from aggregate buffer, and then set them into output
          + * Row based on the mapping relationship between intermediate aggregate data and output data.
          + *
          + * @param records Grouped intermediate aggregate Rows iterator.
          + * @param out The collector to hand results to.
          + *
          + */
          + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
          +
          + var last: Row = null
          + var head: Row = null
          + var lastWindowEnd: java.lang.Long = null
          + var currentWindowStart:java.lang.Long = null
          +
          + val iterator = records.iterator()
          +
          + while (iterator.hasNext) {
          + val record = iterator.next()
          + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf[Long]
          + // initial traversal or opening a new window
          + if (null == lastWindowEnd ||
          + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) {
          +
          + // calculate the current window and open a new window
          + if (null != lastWindowEnd)

          { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + }

          + // initiate intermediate aggregate value.
          + aggregates.foreach(_.initiate(aggregateBuffer))
          + head = record
          + }
          +
          + aggregates.foreach(_.merge(record, aggregateBuffer))
          + last = record
          + lastWindowEnd = getWindowEnd(last)
          + }
          +
          + doEvaluateAndCollect(out, last, head)
          +
          + }
          +
          + def doEvaluateAndCollect(
          + out: Collector[Row],
          + last: Row,
          + head: Row): Unit = {
          + // set group keys value to final output.
          + groupKeysMapping.foreach {
          — End diff –

          This could be done once in the `reduce()` method using the first record obtained from the iterator (no need to remember `last` for this initialization).

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96819808 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateReduceGroupFunction.scala — @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.util. {Collector, Preconditions} + +import scala.collection.JavaConversions._ +import org.apache.flink.streaming.api.windowing.windows.TimeWindow + +/** + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupReduceOperator] ]. It is used for Session time-window + * on batch. + * Note: + * This can handle two input types: + * 1. when partial aggregate is not supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|rowTime| + * 2. when partial aggregate is supported, the input data structure of reduce is + * |groupKey1|groupKey2|sum1|count1|sum2|count2|windowStart|windowEnd| + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + * @param intermediateRowArity The intermediate row field count. + * @param finalRowArity The output row field count. + * @param finalRowWindowStartPos The relative window-start field position. + * @param finalRowWindowEndPos The relative window-end field position. + * @param gap Session time window gap. + */ +class DataSetSessionWindowAggregateReduceGroupFunction( + aggregates: Array[Aggregate [_ <: Any] ], + groupKeysMapping: Array [(Int, Int)] , + aggregateMapping: Array [(Int, Int)] , + intermediateRowArity: Int, + finalRowArity: Int, + finalRowWindowStartPos: Option [Int] , + finalRowWindowEndPos: Option [Int] , + gap:Long) + extends RichGroupReduceFunction [Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var collector: TimeWindowPropertyCollector = _ + private var intermediateRowWindowStartPos = 0 + private var intermediateRowWindowEndPos = 0 + + override def open(config: Configuration) { + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(groupKeysMapping) + aggregateBuffer = new Row(intermediateRowArity) + intermediateRowWindowStartPos = intermediateRowArity - 2 + intermediateRowWindowEndPos = intermediateRowArity - 1 + output = new Row(finalRowArity) + collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + } + + /** + * For grouped intermediate aggregate Rows, divide window according to the window-start + * and window-end, merge data (within a unified window) into an aggregate buffer, calculate + * aggregated values output from aggregate buffer, and then set them into output + * Row based on the mapping relationship between intermediate aggregate data and output data. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable [Row] , out: Collector [Row] ): Unit = { + + var last: Row = null + var head: Row = null + var lastWindowEnd: java.lang.Long = null + var currentWindowStart:java.lang.Long = null + + val iterator = records.iterator() + + while (iterator.hasNext) { + val record = iterator.next() + currentWindowStart = record.getField(intermediateRowWindowStartPos).asInstanceOf [Long] + // initial traversal or opening a new window + if (null == lastWindowEnd || + (null != lastWindowEnd && currentWindowStart > lastWindowEnd)) { + + // calculate the current window and open a new window + if (null != lastWindowEnd) { + // evaluate and emit the current window's result. + doEvaluateAndCollect(out, last, head) + } + // initiate intermediate aggregate value. + aggregates.foreach(_.initiate(aggregateBuffer)) + head = record + } + + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record + lastWindowEnd = getWindowEnd(last) + } + + doEvaluateAndCollect(out, last, head) + + } + + def doEvaluateAndCollect( + out: Collector [Row] , + last: Row, + head: Row): Unit = { + // set group keys value to final output. + groupKeysMapping.foreach { — End diff – This could be done once in the `reduce()` method using the first record obtained from the iterator (no need to remember `last` for this initialization).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96821333

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -218,6 +216,85 @@ class DataSetWindowAggregate(
          }
          }

          + private[this] def createEventTimeSessionWindowDataSet(
          + inputDS: DataSet[Any],
          + isParserCaseSensitive: Boolean): DataSet[Any] = {
          +
          + val groupingKeys = grouping.indices.toArray
          + val rowTypeInfo = resultRowTypeInfo
          +
          + // grouping window
          + if (groupingKeys.length > 0) {
          + //create mapFunction for initializing the aggregations
          + val mapFunction = createDataSetWindowPrepareMapFunction(
          + window,
          + namedAggregates,
          + grouping,
          + inputType,isParserCaseSensitive)
          +
          + // create groupReduceFunction for calculating the aggregations
          + val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction(
          + window,
          + namedAggregates,
          + inputType,
          + rowRelDataType,
          + grouping,
          + namedProperties)
          +
          + val mappedInput =
          + inputDS
          + .map(mapFunction)
          + .name(prepareOperatorName)
          +
          + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
          +
          + // the position of the rowtime field in the intermediate result for map output
          + val rowTimeFilePos = mapReturnType.getArity - 1
          — End diff –

          should be `rowTimeFieldPos`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96821333 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -218,6 +216,85 @@ class DataSetWindowAggregate( } } + private [this] def createEventTimeSessionWindowDataSet( + inputDS: DataSet [Any] , + isParserCaseSensitive: Boolean): DataSet [Any] = { + + val groupingKeys = grouping.indices.toArray + val rowTypeInfo = resultRowTypeInfo + + // grouping window + if (groupingKeys.length > 0) { + //create mapFunction for initializing the aggregations + val mapFunction = createDataSetWindowPrepareMapFunction( + window, + namedAggregates, + grouping, + inputType,isParserCaseSensitive) + + // create groupReduceFunction for calculating the aggregations + val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + grouping, + namedProperties) + + val mappedInput = + inputDS + .map(mapFunction) + .name(prepareOperatorName) + + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable [Row] ].getProducedType + + // the position of the rowtime field in the intermediate result for map output + val rowTimeFilePos = mapReturnType.getArity - 1 — End diff – should be `rowTimeFieldPos`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3150#discussion_r96825508

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -226,27 +225,101 @@ object AggregateUtil {
          aggregates,
          groupingOffsetMapping,
          aggOffsetMapping,

          • intermediateRowArity,
            + intermediateRowArity + 1,// the addition one field is used to store time attribute
            outputType.getFieldCount)
            +
            + case EventTimeSessionGroupWindow(_, _, gap) =>
            + val (startPos, endPos) = if (isTimeWindow(window)) {
              • End diff –

          Good catching!!

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3150#discussion_r96825508 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -226,27 +225,101 @@ object AggregateUtil { aggregates, groupingOffsetMapping, aggOffsetMapping, intermediateRowArity, + intermediateRowArity + 1,// the addition one field is used to store time attribute outputType.getFieldCount) + + case EventTimeSessionGroupWindow(_, _, gap) => + val (startPos, endPos) = if (isTimeWindow(window)) { End diff – Good catching!!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3150

          @fhueske Thank you very much for your detailed review, your optimization suggestion is that I should be concerned, I will update the PR as soon as possible. Thanks again!!!

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3150 @fhueske Thank you very much for your detailed review, your optimization suggestion is that I should be concerned, I will update the PR as soon as possible. Thanks again!!!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3150

          @fhueske I have updated the PR according to your comments.The change information is as follows:
          1. Fix "remember and read input objects across method calls".
          2. Amend some code format.
          thanks again. SunJincheng.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3150 @fhueske I have updated the PR according to your comments.The change information is as follows: 1. Fix "remember and read input objects across method calls". 2. Amend some code format. thanks again. SunJincheng.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3150

          Thanks for the update @sunjincheng121. The changes look good. I will merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3150 Thanks for the update @sunjincheng121. The changes look good. I will merge this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3150

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3150
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: 1fc34502e09bfde1f510fe8c22b34795170988ca

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: 1fc34502e09bfde1f510fe8c22b34795170988ca

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development