Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Add non-grouped session windows for batch tables as described in FLIP-11.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3266

          FLINK-5219[TableAPI&SQL] Add non-grouped session windows for batch …

          …tables

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [×] General
          • The pull request references the related JIRA issue ("FLINK-5219[TableAPI&SQL] Add non-grouped session windows for batch tables")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [×] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-5219-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3266.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3266


          commit 207781c018a23819863f1a51e2f9fb39c8ce98b8
          Author: Jincheng Sun <sunjincheng121@gmail.com>
          Date: 2017-02-04T05:45:53Z

          FLINK-5219[TableAPI&SQL] Add non-grouped session windows for batch tables


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3266 FLINK-5219 [TableAPI&SQL] Add non-grouped session windows for batch … …tables Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" FLINK-5219 [TableAPI&SQL] Add non-grouped session windows for batch tables") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-5219 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3266.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3266 commit 207781c018a23819863f1a51e2f9fb39c8ce98b8 Author: Jincheng Sun <sunjincheng121@gmail.com> Date: 2017-02-04T05:45:53Z FLINK-5219 [TableAPI&SQL] Add non-grouped session windows for batch tables
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3266

          @fhueske @twalthr In this PR,I In order to code reuse "DataSetSessionWindowAggregatePreProcessor" has implemented the "MapPartitionFunction" and "GroupCombineFunction" two interfaces, I'm not sure if it's correct, what do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3266 @fhueske @twalthr In this PR,I In order to code reuse "DataSetSessionWindowAggregatePreProcessor" has implemented the "MapPartitionFunction" and "GroupCombineFunction" two interfaces, I'm not sure if it's correct, what do you think?
          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi, Fabian HueskeTimo Walther Very appreciate if you can help to review the PR.

          Show
          sunjincheng121 sunjincheng added a comment - Hi, Fabian Hueske Timo Walther Very appreciate if you can help to review the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r99998638

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala —
          @@ -19,30 +19,36 @@ package org.apache.flink.table.runtime.aggregate

          import java.lang.Iterable

          -import org.apache.flink.api.common.functions.RichGroupCombineFunction
          +import org.apache.flink.api.common.functions.

          {AbstractRichFunction, GroupCombineFunction, +MapPartitionFunction}

          import org.apache.flink.api.common.typeinfo.TypeInformation
          import org.apache.flink.api.java.typeutils.ResultTypeQueryable
          import org.apache.flink.types.Row
          import org.apache.flink.configuration.Configuration
          import org.apache.flink.util.

          {Collector, Preconditions}

          /**

          • * This wraps the aggregate logic inside of
          • * [[org.apache.flink.api.java.operators.GroupCombineOperator]].
            + * It wraps the aggregate logic inside of
            + * [[org.apache.flink.api.java.operators.GroupCombineOperator]] and
            + * [[org.apache.flink.api.java.operators.MapPartitionOperator]] It is used for
            + * Sessiontime-window on batch.
            *
          • @param aggregates The aggregate functions.
          • @param groupingKeys The indexes of the grouping fields.
          • @param intermediateRowArity The intermediate row field count.
          • @param gap Session time window gap.
          • @param intermediateRowType Intermediate row data type.
            */
            -class DataSetSessionWindowAggregateCombineGroupFunction(
            +class DataSetSessionWindowAggregatePreProcessor(
            aggregates: Array[Aggregate[_ <: Any]],
            groupingKeys: Array[Int],
            intermediateRowArity: Int,
            gap: Long,
            @transient intermediateRowType: TypeInformation[Row])
          • extends RichGroupCombineFunction[Row,Row] with ResultTypeQueryable[Row] {
            + extends AbstractRichFunction
            + with MapPartitionFunction[Row,Row]
              • End diff –

          I would like to indent `with` and `extends` on the same indent.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r99998638 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala — @@ -19,30 +19,36 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import org.apache.flink.api.common.functions.RichGroupCombineFunction +import org.apache.flink.api.common.functions. {AbstractRichFunction, GroupCombineFunction, +MapPartitionFunction} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.util. {Collector, Preconditions} /** * This wraps the aggregate logic inside of * [ [org.apache.flink.api.java.operators.GroupCombineOperator] ]. + * It wraps the aggregate logic inside of + * [ [org.apache.flink.api.java.operators.GroupCombineOperator] ] and + * [ [org.apache.flink.api.java.operators.MapPartitionOperator] ] It is used for + * Sessiontime-window on batch. * @param aggregates The aggregate functions. @param groupingKeys The indexes of the grouping fields. @param intermediateRowArity The intermediate row field count. @param gap Session time window gap. @param intermediateRowType Intermediate row data type. */ -class DataSetSessionWindowAggregateCombineGroupFunction( +class DataSetSessionWindowAggregatePreProcessor( aggregates: Array[Aggregate [_ <: Any] ], groupingKeys: Array [Int] , intermediateRowArity: Int, gap: Long, @transient intermediateRowType: TypeInformation [Row] ) extends RichGroupCombineFunction [Row,Row] with ResultTypeQueryable [Row] { + extends AbstractRichFunction + with MapPartitionFunction [Row,Row] End diff – I would like to indent `with` and `extends` on the same indent.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r100000375

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala —
          @@ -141,16 +141,18 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }

          • @Test(expected = classOf[UnsupportedOperationException])
            + @Test
            def testAlldEventTimeSessionGroupWindow(): Unit = {
          • // Non-grouping Session window on event-time are currently not supported
            val env = ExecutionEnvironment.getExecutionEnvironment
            val tEnv = TableEnvironment.getTableEnvironment(env, config)
            val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
            val windowedTable =table
            .window(Session withGap 7.milli on 'long as 'w)
            .groupBy('w)
          • .select('string.count).toDataSet[Row].collect()
            + .select('string.count)
            + val results = windowedTable.toDataSet[Row].collect()
            + val expected = "6\n1";
              • End diff –

          remove the semicolon `;`

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r100000375 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala — @@ -141,16 +141,18 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test(expected = classOf [UnsupportedOperationException] ) + @Test def testAlldEventTimeSessionGroupWindow(): Unit = { // Non-grouping Session window on event-time are currently not supported val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) val windowedTable =table .window(Session withGap 7.milli on 'long as 'w) .groupBy('w) .select('string.count).toDataSet [Row] .collect() + .select('string.count) + val results = windowedTable.toDataSet [Row] .collect() + val expected = "6\n1"; End diff – remove the semicolon `;`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r99996840

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -291,13 +292,13 @@ object AggregateUtil {
          inputType,
          Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))

          • new DataSetSessionWindowAggregateCombineGroupFunction(
            + new DataSetSessionWindowAggregatePreProcessor(
            aggregates,
            groupings,
            // the addition two fields are used to store window-start and window-end attributes
            intermediateRowArity + 2,
            asLong(gap),
          • combineReturnType)
            + combineReturnType).asInstanceOf[GroupCombineFunction[Row,Row]]
              • End diff –

          Can we leave out the `asInstanceOf` ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r99996840 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -291,13 +292,13 @@ object AggregateUtil { inputType, Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) new DataSetSessionWindowAggregateCombineGroupFunction( + new DataSetSessionWindowAggregatePreProcessor( aggregates, groupings, // the addition two fields are used to store window-start and window-end attributes intermediateRowArity + 2, asLong(gap), combineReturnType) + combineReturnType).asInstanceOf[GroupCombineFunction [Row,Row] ] End diff – Can we leave out the `asInstanceOf` ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r99997022

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -306,6 +307,85 @@ object AggregateUtil {
          }

          /**
          + * Create a [[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
          + * for aggregates.
          + * The function returns aggregate values of all aggregate function which are
          + * organized by the following format:
          + *
          + * {{

          { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(rowtime) + * | | | + * v v v + * +--------+--------+--------+--------+-----------+---------+ + * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime)) + * + * }

          }}
          + *
          + */
          + def createDataSetWindowAggregationMapPartitionFunction(
          + window: LogicalWindow,
          + namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          + inputType: RelDataType,
          + outputType: RelDataType = null,
          + properties: Seq[NamedWindowProperty] = null,
          + isPreMapPartition: Boolean = true,
          + isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
          +
          + val aggregates = transformToAggregateFunctions(
          + namedAggregates.map(_.getKey),
          + inputType,
          + 0)._2
          +
          + val intermediateRowArity = aggregates.map(_.intermediateDataType.length).sum
          +
          + window match {
          + case EventTimeSessionGroupWindow(_, _, gap) =>
          + if (isPreMapPartition) {
          + val preMapReturnType: RowTypeInfo =
          + createAggregateBufferDataType(
          + Array(),
          + aggregates,
          + inputType,
          + Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
          +
          + new DataSetSessionWindowAggregatePreProcessor(
          + aggregates,
          + Array(),
          + // the addition two fields are used to store window-start and window-end attributes
          + intermediateRowArity + 2,
          + asLong(gap),
          + preMapReturnType).asInstanceOf[MapPartitionFunction[Row, Row]]
          — End diff –

          Can we leave out the `asInstanceOf` ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r99997022 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -306,6 +307,85 @@ object AggregateUtil { } /** + * Create a [ [org.apache.flink.api.common.functions.MapPartitionFunction] ] that aggregation + * for aggregates. + * The function returns aggregate values of all aggregate function which are + * organized by the following format: + * + * {{ { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(rowtime) + * | | | + * v v v + * +--------+--------+--------+--------+-----------+---------+ + * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime)) + * + * } }} + * + */ + def createDataSetWindowAggregationMapPartitionFunction( + window: LogicalWindow, + namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + inputType: RelDataType, + outputType: RelDataType = null, + properties: Seq [NamedWindowProperty] = null, + isPreMapPartition: Boolean = true, + isInputCombined: Boolean = false): MapPartitionFunction [Row, Row] = { + + val aggregates = transformToAggregateFunctions( + namedAggregates.map(_.getKey), + inputType, + 0)._2 + + val intermediateRowArity = aggregates.map(_.intermediateDataType.length).sum + + window match { + case EventTimeSessionGroupWindow(_, _, gap) => + if (isPreMapPartition) { + val preMapReturnType: RowTypeInfo = + createAggregateBufferDataType( + Array(), + aggregates, + inputType, + Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) + + new DataSetSessionWindowAggregatePreProcessor( + aggregates, + Array(), + // the addition two fields are used to store window-start and window-end attributes + intermediateRowArity + 2, + asLong(gap), + preMapReturnType).asInstanceOf[MapPartitionFunction [Row, Row] ] — End diff – Can we leave out the `asInstanceOf` ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r99987780

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -271,19 +269,46 @@ class DataSetWindowAggregate(
          isInputCombined = true)

          mappedInput

          • .groupBy(groupingKeys: _*)
          • .sortGroup(rowTimeFieldPos, Order.ASCENDING)
          • .combineGroup(combineGroupFunction)
          • .groupBy(groupingKeys: _*)
          • .sortGroup(windowStartPos, Order.ASCENDING)
          • .sortGroup(windowEndPos, Order.ASCENDING)
          • .reduceGroup(groupReduceFunction)
          • .returns(rowTypeInfo)
          • .name(aggregateOperatorName)
          • .asInstanceOf[DataSet[Any]]
            + .groupBy(groupingKeys: _*)
            + .sortGroup(rowTimeFieldPos, Order.ASCENDING)
            + .combineGroup(combineGroupFunction)
            + .groupBy(groupingKeys: _*)
            + .sortGroup(windowStartPos, Order.ASCENDING)
            + .sortGroup(windowEndPos, Order.ASCENDING)
            + .reduceGroup(groupReduceFunction)
            + .returns(rowTypeInfo)
            + .name(aggregateOperatorName)
            + .asInstanceOf[DataSet[Any]]
            +
            + } else {
            + // non-grouping window
            + val preMapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction(
            + window,
            + namedAggregates,
            + inputType)
            +
            + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction(
            + window,
            + namedAggregates,
            + inputType,
            + rowRelDataType,
            + namedProperties,
            + isPreMapPartition = false,
            + isInputCombined = true)
            +
            + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1)
            + .mapPartition(preMapPartitionFunction).setParallelism(1)
              • End diff –

          Please indent every operator method in a single line.

          I think the first `sortPartition` and the first `mapPartition` can be run in parallel, do not need to set `setParallelism(1)`. Otherwise, the performance will be poorer than the approach of non-incremental agg & non-grouping.

          What do you think ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r99987780 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -271,19 +269,46 @@ class DataSetWindowAggregate( isInputCombined = true) mappedInput .groupBy(groupingKeys: _*) .sortGroup(rowTimeFieldPos, Order.ASCENDING) .combineGroup(combineGroupFunction) .groupBy(groupingKeys: _*) .sortGroup(windowStartPos, Order.ASCENDING) .sortGroup(windowEndPos, Order.ASCENDING) .reduceGroup(groupReduceFunction) .returns(rowTypeInfo) .name(aggregateOperatorName) .asInstanceOf[DataSet [Any] ] + .groupBy(groupingKeys: _*) + .sortGroup(rowTimeFieldPos, Order.ASCENDING) + .combineGroup(combineGroupFunction) + .groupBy(groupingKeys: _*) + .sortGroup(windowStartPos, Order.ASCENDING) + .sortGroup(windowEndPos, Order.ASCENDING) + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .name(aggregateOperatorName) + .asInstanceOf[DataSet [Any] ] + + } else { + // non-grouping window + val preMapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction( + window, + namedAggregates, + inputType) + + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + namedProperties, + isPreMapPartition = false, + isInputCombined = true) + + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1) + .mapPartition(preMapPartitionFunction).setParallelism(1) End diff – Please indent every operator method in a single line. I think the first `sortPartition` and the first `mapPartition` can be run in parallel, do not need to set `setParallelism(1)`. Otherwise, the performance will be poorer than the approach of non-incremental agg & non-grouping. What do you think ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r99987789

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -299,13 +324,24 @@ class DataSetWindowAggregate(
          .returns(rowTypeInfo)
          .name(aggregateOperatorName)
          .asInstanceOf[DataSet[Any]]
          +
          + } else {
          + // non-grouping window
          + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction(
          + window,
          + namedAggregates,
          + inputType,
          + rowRelDataType,
          + namedProperties,
          + isPreMapPartition = false)
          +
          + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1)
          + .mapPartition(mapPartitionFunction).setParallelism(1)
          — End diff –

          indent

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r99987789 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -299,13 +324,24 @@ class DataSetWindowAggregate( .returns(rowTypeInfo) .name(aggregateOperatorName) .asInstanceOf[DataSet [Any] ] + + } else { + // non-grouping window + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + namedProperties, + isPreMapPartition = false) + + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1) + .mapPartition(mapPartitionFunction).setParallelism(1) — End diff – indent
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r99997024

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -306,6 +307,85 @@ object AggregateUtil {
          }

          /**
          + * Create a [[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
          + * for aggregates.
          + * The function returns aggregate values of all aggregate function which are
          + * organized by the following format:
          + *
          + * {{

          { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(rowtime) + * | | | + * v v v + * +--------+--------+--------+--------+-----------+---------+ + * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime)) + * + * }

          }}
          + *
          + */
          + def createDataSetWindowAggregationMapPartitionFunction(
          + window: LogicalWindow,
          + namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          + inputType: RelDataType,
          + outputType: RelDataType = null,
          + properties: Seq[NamedWindowProperty] = null,
          + isPreMapPartition: Boolean = true,
          + isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
          +
          + val aggregates = transformToAggregateFunctions(
          + namedAggregates.map(_.getKey),
          + inputType,
          + 0)._2
          +
          + val intermediateRowArity = aggregates.map(_.intermediateDataType.length).sum
          +
          + window match {
          + case EventTimeSessionGroupWindow(_, _, gap) =>
          + if (isPreMapPartition)

          { + val preMapReturnType: RowTypeInfo = + createAggregateBufferDataType( + Array(), + aggregates, + inputType, + Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) + + new DataSetSessionWindowAggregatePreProcessor( + aggregates, + Array(), + // the addition two fields are used to store window-start and window-end attributes + intermediateRowArity + 2, + asLong(gap), + preMapReturnType).asInstanceOf[MapPartitionFunction[Row, Row]] + + }

          else {
          + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
          +
          + // the mapping relation between aggregate function index in list and its corresponding
          + // field index in output Row.
          + val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
          +
          + new DataSetSessionWindowAggregateProcessor(
          + aggregates,
          + Array(),
          + aggOffsetMapping,
          + // the additional two fields are used to store window-start and window-end attributes
          + intermediateRowArity + 2,
          + outputType.getFieldCount,
          + startPos,
          + endPos,
          + asLong(gap),
          + isInputCombined).asInstanceOf[MapPartitionFunction[Row, Row]]
          — End diff –

          Can we leave out the `asInstanceOf` ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r99997024 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -306,6 +307,85 @@ object AggregateUtil { } /** + * Create a [ [org.apache.flink.api.common.functions.MapPartitionFunction] ] that aggregation + * for aggregates. + * The function returns aggregate values of all aggregate function which are + * organized by the following format: + * + * {{ { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(rowtime) + * | | | + * v v v + * +--------+--------+--------+--------+-----------+---------+ + * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime)) + * + * } }} + * + */ + def createDataSetWindowAggregationMapPartitionFunction( + window: LogicalWindow, + namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + inputType: RelDataType, + outputType: RelDataType = null, + properties: Seq [NamedWindowProperty] = null, + isPreMapPartition: Boolean = true, + isInputCombined: Boolean = false): MapPartitionFunction [Row, Row] = { + + val aggregates = transformToAggregateFunctions( + namedAggregates.map(_.getKey), + inputType, + 0)._2 + + val intermediateRowArity = aggregates.map(_.intermediateDataType.length).sum + + window match { + case EventTimeSessionGroupWindow(_, _, gap) => + if (isPreMapPartition) { + val preMapReturnType: RowTypeInfo = + createAggregateBufferDataType( + Array(), + aggregates, + inputType, + Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) + + new DataSetSessionWindowAggregatePreProcessor( + aggregates, + Array(), + // the addition two fields are used to store window-start and window-end attributes + intermediateRowArity + 2, + asLong(gap), + preMapReturnType).asInstanceOf[MapPartitionFunction[Row, Row]] + + } else { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) + + // the mapping relation between aggregate function index in list and its corresponding + // field index in output Row. + val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType) + + new DataSetSessionWindowAggregateProcessor( + aggregates, + Array(), + aggOffsetMapping, + // the additional two fields are used to store window-start and window-end attributes + intermediateRowArity + 2, + outputType.getFieldCount, + startPos, + endPos, + asLong(gap), + isInputCombined).asInstanceOf[MapPartitionFunction [Row, Row] ] — End diff – Can we leave out the `asInstanceOf` ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r99998655

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala —
          @@ -59,7 +61,9 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
          finalRowWindowEndPos: Option[Int],
          gap:Long,
          isInputCombined: Boolean)

          • extends RichGroupReduceFunction[Row, Row] {
            + extends AbstractRichFunction
            + with MapPartitionFunction[Row, Row]
              • End diff –

          I would like to indent `with` and `extends` on the same indent.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r99998655 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala — @@ -59,7 +61,9 @@ class DataSetSessionWindowAggregateReduceGroupFunction( finalRowWindowEndPos: Option [Int] , gap:Long, isInputCombined: Boolean) extends RichGroupReduceFunction [Row, Row] { + extends AbstractRichFunction + with MapPartitionFunction [Row, Row] End diff – I would like to indent `with` and `extends` on the same indent.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r100001509

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -210,14 +210,14 @@ object AggregateUtil {
          else {
          // for non-incremental aggregations
          new DataSetTumbleTimeWindowAggReduceGroupFunction(

          • intermediateRowArity - 1,
            + intermediateRowArity,
            asLong(size),
            startPos,
            endPos,
            aggregates,
            groupingOffsetMapping,
            aggOffsetMapping,
          • intermediateRowArity,
            + intermediateRowArity + 1, // the additional field is used to store the time attribute
              • End diff –

          good catch !

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r100001509 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -210,14 +210,14 @@ object AggregateUtil { else { // for non-incremental aggregations new DataSetTumbleTimeWindowAggReduceGroupFunction( intermediateRowArity - 1, + intermediateRowArity, asLong(size), startPos, endPos, aggregates, groupingOffsetMapping, aggOffsetMapping, intermediateRowArity, + intermediateRowArity + 1, // the additional field is used to store the time attribute End diff – good catch !
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3266

          @wuchong thanks for review the PR. your comments make sense for me. I had update the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3266 @wuchong thanks for review the PR. your comments make sense for me. I had update the PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r101013750

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala —
          @@ -19,15 +19,17 @@ package org.apache.flink.table.runtime.aggregate

          import java.lang.Iterable

          -import org.apache.flink.api.common.functions.RichGroupReduceFunction
          +import org.apache.flink.api.common.functions.

          {AbstractRichFunction, GroupReduceFunction, +MapPartitionFunction}

          — End diff –

          do not break the line

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r101013750 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala — @@ -19,15 +19,17 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.functions. {AbstractRichFunction, GroupReduceFunction, +MapPartitionFunction} — End diff – do not break the line
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r101014526

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala —
          @@ -19,30 +19,36 @@ package org.apache.flink.table.runtime.aggregate

          import java.lang.Iterable

          -import org.apache.flink.api.common.functions.RichGroupCombineFunction
          +import org.apache.flink.api.common.functions.

          {AbstractRichFunction, GroupCombineFunction, +MapPartitionFunction}

          — End diff –

          No line break. Imports may exceed the 100 char limit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r101014526 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala — @@ -19,30 +19,36 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import org.apache.flink.api.common.functions.RichGroupCombineFunction +import org.apache.flink.api.common.functions. {AbstractRichFunction, GroupCombineFunction, +MapPartitionFunction} — End diff – No line break. Imports may exceed the 100 char limit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r101016814

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -306,6 +307,85 @@ object AggregateUtil {
          }

          /**
          + * Create a [[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
          + * for aggregates.
          + * The function returns aggregate values of all aggregate function which are
          + * organized by the following format:
          + *
          + * {{

          { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(rowtime) + * | | | + * v v v + * +--------+--------+--------+--------+-----------+---------+ + * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime)) + * + * }

          }}
          + *
          + */
          + def createDataSetWindowAggregationMapPartitionFunction(
          + window: LogicalWindow,
          + namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          + inputType: RelDataType,
          + outputType: RelDataType = null,
          + properties: Seq[NamedWindowProperty] = null,
          + isPreMapPartition: Boolean = true,
          + isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
          +
          + val aggregates = transformToAggregateFunctions(
          + namedAggregates.map(_.getKey),
          + inputType,
          + 0)._2
          +
          + val intermediateRowArity = aggregates.map(_.intermediateDataType.length).sum
          +
          + window match {
          + case EventTimeSessionGroupWindow(_, _, gap) =>
          + if (isPreMapPartition) {
          + val preMapReturnType: RowTypeInfo =
          + createAggregateBufferDataType(
          + Array(),
          + aggregates,
          + inputType,
          + Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO)))
          +
          + new DataSetSessionWindowAggregatePreProcessor(
          + aggregates,
          + Array(),
          + // the addition two fields are used to store window-start and window-end attributes
          — End diff –

          the addition*al* two fields

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r101016814 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -306,6 +307,85 @@ object AggregateUtil { } /** + * Create a [ [org.apache.flink.api.common.functions.MapPartitionFunction] ] that aggregation + * for aggregates. + * The function returns aggregate values of all aggregate function which are + * organized by the following format: + * + * {{ { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(rowtime) + * | | | + * v v v + * +--------+--------+--------+--------+-----------+---------+ + * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime)) + * + * } }} + * + */ + def createDataSetWindowAggregationMapPartitionFunction( + window: LogicalWindow, + namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + inputType: RelDataType, + outputType: RelDataType = null, + properties: Seq [NamedWindowProperty] = null, + isPreMapPartition: Boolean = true, + isInputCombined: Boolean = false): MapPartitionFunction [Row, Row] = { + + val aggregates = transformToAggregateFunctions( + namedAggregates.map(_.getKey), + inputType, + 0)._2 + + val intermediateRowArity = aggregates.map(_.intermediateDataType.length).sum + + window match { + case EventTimeSessionGroupWindow(_, _, gap) => + if (isPreMapPartition) { + val preMapReturnType: RowTypeInfo = + createAggregateBufferDataType( + Array(), + aggregates, + inputType, + Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) + + new DataSetSessionWindowAggregatePreProcessor( + aggregates, + Array(), + // the addition two fields are used to store window-start and window-end attributes — End diff – the addition* al * two fields
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r101014900

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -281,9 +279,36 @@ class DataSetWindowAggregate(
          .returns(rowTypeInfo)
          .name(aggregateOperatorName)
          .asInstanceOf[DataSet[Any]]
          +
          + } else {
          + // non-grouping window
          + val preMapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction(
          + window,
          + namedAggregates,
          + inputType)
          +
          + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction(
          + window,
          + namedAggregates,
          + inputType,
          + rowRelDataType,
          + namedProperties,
          + isPreMapPartition = false,
          + isInputCombined = true)
          +
          + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING)
          + .mapPartition(preMapPartitionFunction)
          + .sortPartition(windowStartPos, Order.ASCENDING).setParallelism(1)
          + .sortPartition(windowEndPos, Order.ASCENDING).setParallelism(1)
          + .mapPartition(mapPartitionFunction).setParallelism(1)
          — End diff –

          I think we can also use `.reduceGroup()` and a `GroupReduceFunction` here. Without `groupBy`, the `GroupReduceFunction` will be executed with parallelism 1.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r101014900 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -281,9 +279,36 @@ class DataSetWindowAggregate( .returns(rowTypeInfo) .name(aggregateOperatorName) .asInstanceOf[DataSet [Any] ] + + } else { + // non-grouping window + val preMapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction( + window, + namedAggregates, + inputType) + + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + namedProperties, + isPreMapPartition = false, + isInputCombined = true) + + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING) + .mapPartition(preMapPartitionFunction) + .sortPartition(windowStartPos, Order.ASCENDING).setParallelism(1) + .sortPartition(windowEndPos, Order.ASCENDING).setParallelism(1) + .mapPartition(mapPartitionFunction).setParallelism(1) — End diff – I think we can also use `.reduceGroup()` and a `GroupReduceFunction` here. Without `groupBy`, the `GroupReduceFunction` will be executed with parallelism 1.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r101014280

          — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala —
          @@ -141,16 +141,18 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode)
          TestBaseUtils.compareResultAsText(results.asJava, expected)
          }

          • @Test(expected = classOf[UnsupportedOperationException])
            + @Test
            def testAlldEventTimeSessionGroupWindow(): Unit = {
          • // Non-grouping Session window on event-time are currently not supported
            val env = ExecutionEnvironment.getExecutionEnvironment
            val tEnv = TableEnvironment.getTableEnvironment(env, config)
            val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string)
            val windowedTable =table
            .window(Session withGap 7.milli on 'long as 'w)
            .groupBy('w)
          • .select('string.count).toDataSet[Row].collect()
            + .select('string.count)
            + val results = windowedTable.toDataSet[Row].collect()
            + val expected = "6\n1"
              • End diff –

          Would be good to change the query in a way that it checks that multiple sessions are correctly computed.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r101014280 — Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala — @@ -141,16 +141,18 @@ class DataSetWindowAggregateITCase(configMode: TableConfigMode) TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test(expected = classOf [UnsupportedOperationException] ) + @Test def testAlldEventTimeSessionGroupWindow(): Unit = { // Non-grouping Session window on event-time are currently not supported val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 'string) val windowedTable =table .window(Session withGap 7.milli on 'long as 'w) .groupBy('w) .select('string.count).toDataSet [Row] .collect() + .select('string.count) + val results = windowedTable.toDataSet [Row] .collect() + val expected = "6\n1" End diff – Would be good to change the query in a way that it checks that multiple sessions are correctly computed.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r101015171

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala —
          @@ -59,7 +61,9 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
          finalRowWindowEndPos: Option[Int],
          gap:Long,
          isInputCombined: Boolean)

          • extends RichGroupReduceFunction[Row, Row] {
            + extends AbstractRichFunction
            + with MapPartitionFunction[Row, Row]
              • End diff –

          I like the idea of implementing a joint `MapPartition` and `GroupReduce` function. However, I think it is not necessary for the final aggregation. We can also call `DataSet.reduceGroup()` which will do the same as `DataSet.mapPartition().parallelism(1)`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r101015171 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala — @@ -59,7 +61,9 @@ class DataSetSessionWindowAggregateReduceGroupFunction( finalRowWindowEndPos: Option [Int] , gap:Long, isInputCombined: Boolean) extends RichGroupReduceFunction [Row, Row] { + extends AbstractRichFunction + with MapPartitionFunction [Row, Row] End diff – I like the idea of implementing a joint `MapPartition` and `GroupReduce` function. However, I think it is not necessary for the final aggregation. We can also call `DataSet.reduceGroup()` which will do the same as `DataSet.mapPartition().parallelism(1)`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r101014879

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala —
          @@ -294,18 +319,29 @@ class DataSetWindowAggregate(
          namedProperties)

          mappedInput.groupBy(groupingKeys: _*)

          • .sortGroup(rowTimeFieldPos, Order.ASCENDING)
          • .reduceGroup(groupReduceFunction)
          • .returns(rowTypeInfo)
          • .name(aggregateOperatorName)
          • .asInstanceOf[DataSet[Any]]
            + .sortGroup(rowTimeFieldPos, Order.ASCENDING)
            + .reduceGroup(groupReduceFunction)
            + .returns(rowTypeInfo)
            + .name(aggregateOperatorName)
            + .asInstanceOf[DataSet[Any]]
            +
            + } else {
            + // non-grouping window
            + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction(
            + window,
            + namedAggregates,
            + inputType,
            + rowRelDataType,
            + namedProperties,
            + isPreMapPartition = false)
            +
            + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1)
            + .mapPartition(mapPartitionFunction).setParallelism(1)
              • End diff –

          I think we can also use `.reduceGroup()` and a `GroupReduceFunction` here. Without `groupBy`, the `GroupReduceFunction` will be executed with parallelism 1.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r101014879 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala — @@ -294,18 +319,29 @@ class DataSetWindowAggregate( namedProperties) mappedInput.groupBy(groupingKeys: _*) .sortGroup(rowTimeFieldPos, Order.ASCENDING) .reduceGroup(groupReduceFunction) .returns(rowTypeInfo) .name(aggregateOperatorName) .asInstanceOf[DataSet [Any] ] + .sortGroup(rowTimeFieldPos, Order.ASCENDING) + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .name(aggregateOperatorName) + .asInstanceOf[DataSet [Any] ] + + } else { + // non-grouping window + val mapPartitionFunction = createDataSetWindowAggregationMapPartitionFunction( + window, + namedAggregates, + inputType, + rowRelDataType, + namedProperties, + isPreMapPartition = false) + + mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING).setParallelism(1) + .mapPartition(mapPartitionFunction).setParallelism(1) End diff – I think we can also use `.reduceGroup()` and a `GroupReduceFunction` here. Without `groupBy`, the `GroupReduceFunction` will be executed with parallelism 1.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3266#discussion_r101016955

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -306,6 +307,85 @@ object AggregateUtil {
          }

          /**
          + * Create a [[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
          + * for aggregates.
          + * The function returns aggregate values of all aggregate function which are
          + * organized by the following format:
          + *
          + * {{

          { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(rowtime) + * | | | + * v v v + * +--------+--------+--------+--------+-----------+---------+ + * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime)) + * + * }

          }}
          + *
          + */
          + def createDataSetWindowAggregationMapPartitionFunction(
          + window: LogicalWindow,
          + namedAggregates: Seq[CalcitePair[AggregateCall, String]],
          + inputType: RelDataType,
          + outputType: RelDataType = null,
          + properties: Seq[NamedWindowProperty] = null,
          + isPreMapPartition: Boolean = true,
          + isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
          +
          + val aggregates = transformToAggregateFunctions(
          + namedAggregates.map(_.getKey),
          + inputType,
          + 0)._2
          +
          + val intermediateRowArity = aggregates.map(_.intermediateDataType.length).sum
          +
          + window match {
          + case EventTimeSessionGroupWindow(_, _, gap) =>
          + if (isPreMapPartition)

          { + val preMapReturnType: RowTypeInfo = + createAggregateBufferDataType( + Array(), + aggregates, + inputType, + Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) + + new DataSetSessionWindowAggregatePreProcessor( + aggregates, + Array(), + // the addition two fields are used to store window-start and window-end attributes + intermediateRowArity + 2, + asLong(gap), + preMapReturnType) + + }

          else {
          + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
          — End diff –

          We do not need this case if we compute the final aggregates with a `GroupReduceFunction`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3266#discussion_r101016955 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -306,6 +307,85 @@ object AggregateUtil { } /** + * Create a [ [org.apache.flink.api.common.functions.MapPartitionFunction] ] that aggregation + * for aggregates. + * The function returns aggregate values of all aggregate function which are + * organized by the following format: + * + * {{ { + * avg(x) aggOffsetInRow = 2 count(z) aggOffsetInRow = 5 + * | | windowEnd(max(rowtime) + * | | | + * v v v + * +--------+--------+--------+--------+-----------+---------+ + * | sum1 | count1 | sum2 | count2 |windowStart|windowEnd| + * +--------+--------+--------+--------+-----------+---------+ + * ^ ^ + * | | + * sum(y) aggOffsetInRow = 4 windowStart(min(rowtime)) + * + * } }} + * + */ + def createDataSetWindowAggregationMapPartitionFunction( + window: LogicalWindow, + namedAggregates: Seq[CalcitePair [AggregateCall, String] ], + inputType: RelDataType, + outputType: RelDataType = null, + properties: Seq [NamedWindowProperty] = null, + isPreMapPartition: Boolean = true, + isInputCombined: Boolean = false): MapPartitionFunction [Row, Row] = { + + val aggregates = transformToAggregateFunctions( + namedAggregates.map(_.getKey), + inputType, + 0)._2 + + val intermediateRowArity = aggregates.map(_.intermediateDataType.length).sum + + window match { + case EventTimeSessionGroupWindow(_, _, gap) => + if (isPreMapPartition) { + val preMapReturnType: RowTypeInfo = + createAggregateBufferDataType( + Array(), + aggregates, + inputType, + Option(Array(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO))) + + new DataSetSessionWindowAggregatePreProcessor( + aggregates, + Array(), + // the addition two fields are used to store window-start and window-end attributes + intermediateRowArity + 2, + asLong(gap), + preMapReturnType) + + } else { + val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) — End diff – We do not need this case if we compute the final aggregates with a `GroupReduceFunction`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3266

          @fhueske thanks for the review. you are right. I need rebase the code and update the PR according your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3266 @fhueske thanks for the review. you are right. I need rebase the code and update the PR according your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3266

          Hi @sunjincheng121, any news for this PR?
          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3266 Hi @sunjincheng121, any news for this PR? Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3266

          Hi @fhueske I'm sorry for the late reply, and thanks a lot for your reminding. I have rebased the code and update the PR according your comments.
          Best,
          SunJIncheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3266 Hi @fhueske I'm sorry for the late reply, and thanks a lot for your reminding. I have rebased the code and update the PR according your comments. Best, SunJIncheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3266

          Thanks for the update @sunjincheng121.
          IMO the PR is good to merge.

          I might wait until PR #3423 has been merged which touches all aggregation functions.
          So we might need to rebase the PR another time, but I think this should not be too much work.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3266 Thanks for the update @sunjincheng121. IMO the PR is good to merge. I might wait until PR #3423 has been merged which touches all aggregation functions. So we might need to rebase the PR another time, but I think this should not be too much work. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3266

          @fhueske Thanks for your quick review, and sounds good.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3266 @fhueske Thanks for your quick review, and sounds good.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3266

          Hi, @fhueske I had rebase the code on PR #3423's commit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3266 Hi, @fhueske I had rebase the code on PR #3423 's commit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3266

          Thanks for the update @sunjincheng121.
          Merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3266 Thanks for the update @sunjincheng121. Merging
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3266

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3266
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with 728c936dd2ac18701e1d8696da251aec351b2ae6

          Show
          fhueske Fabian Hueske added a comment - Implemented with 728c936dd2ac18701e1d8696da251aec351b2ae6

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development