Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4557 Table API Stream Aggregations
  3. FLINK-5804

Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

    Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      The goal of this issue is to add support for OVER RANGE aggregations on processing time streams to the SQL interface.

      Queries similar to the following should be supported:

      SELECT 
        a, 
        SUM(b) OVER (ORDER BY procTime() RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sumB,
        MIN(b) OVER (ORDER BY procTime() RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS minB
      FROM myStream
      

      The following restrictions should initially apply:

      • All OVER clauses in the same SELECT clause must be exactly the same.
      • Since no PARTITION BY clause is specified, the execution will be single threaded.
      • The ORDER BY clause may only have procTime() as parameter. procTime() is a parameterless scalar function that just indicates processing time mode.
      • bounded PRECEDING is not supported (see FLINK-5654)
      • FOLLOWING is not supported.

      The restrictions will be resolved in follow up issues. If we find that some of the restrictions are trivial to address, we can add the functionality in this issue as well.

      This issue includes:

      • Design of the DataStream operator to compute OVER ROW aggregates
      • Translation from Calcite's RelNode representation (LogicalProject with RexOver expression).

        Issue Links

          Activity

          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi,guys,I made a preliminary implementation of this JIRA.
          My approach is:
          1. Calcite -> Flink
          "LogicalProject with RexOver expression" – (normalize rule) -> "Calcite's LogicalWindow" – (opt rule) -> DataStreamRowWindowAggregate
          2. datastreamAPI:(Without partitionBy situation):
          inputDS.map().setParallelism(1), map has implement CheckPointedFunction.
          3. About OrderBy:
          According to the natural order of elements, procTime () use for generate end-time of the window and guaranteed pass the sql validation.

          Show
          sunjincheng121 sunjincheng added a comment - Hi,guys,I made a preliminary implementation of this JIRA. My approach is: 1. Calcite -> Flink "LogicalProject with RexOver expression" – (normalize rule) -> "Calcite's LogicalWindow" – (opt rule) -> DataStreamRowWindowAggregate 2. datastreamAPI:(Without partitionBy situation): inputDS.map().setParallelism(1), map has implement CheckPointedFunction. 3. About OrderBy: According to the natural order of elements, procTime () use for generate end-time of the window and guaranteed pass the sql validation.
          Hide
          sunjincheng121 sunjincheng added a comment -

          After Allow ProcessFunction on non-keyed streams,The realization of this issue has become much simpler.

          Show
          sunjincheng121 sunjincheng added a comment - After Allow ProcessFunction on non-keyed streams,The realization of this issue has become much simpler.
          Hide
          fhueske Fabian Hueske added a comment -

          Non-keyed ProcessFunction might be available soon.
          Have a look at PR #3438.

          Show
          fhueske Fabian Hueske added a comment - Non-keyed ProcessFunction might be available soon. Have a look at PR #3438 .
          Hide
          sunjincheng121 sunjincheng added a comment -

          Fabian Hueske IMO. when the PR(#3438) had merged, we can share `UnboundedProcessingOverProcessFunction` with partitioned situation. Then we need not implement CheckPointedFunction. What do you think?

          Show
          sunjincheng121 sunjincheng added a comment - Fabian Hueske IMO. when the PR(#3438) had merged, we can share `UnboundedProcessingOverProcessFunction` with partitioned situation. Then we need not implement CheckPointedFunction. What do you think?
          Hide
          sunjincheng121 sunjincheng added a comment -

          Haha,Fabian Hueske we post the message at same time.

          Show
          sunjincheng121 sunjincheng added a comment - Haha, Fabian Hueske we post the message at same time.
          Hide
          fhueske Fabian Hueske added a comment -

          sounds good to me

          Show
          fhueske Fabian Hueske added a comment - sounds good to me
          Hide
          sunjincheng121 sunjincheng added a comment -

          Hi,Fabian Hueske I use the processFunction for a simple implementation, shaped like:

           inputDS
              .process(processFunction)
              .map(mapFunction).setParallelism(1)
              .returns(rowTypeInfo)
          

          Is that make sense to you ? If so, I will open the PR.

          Show
          sunjincheng121 sunjincheng added a comment - Hi, Fabian Hueske I use the processFunction for a simple implementation, shaped like: inputDS .process(processFunction) .map(mapFunction).setParallelism(1) .returns(rowTypeInfo) Is that make sense to you ? If so, I will open the PR.
          Hide
          fhueske Fabian Hueske added a comment -

          I think the process function should also run with parallelism 1.
          What's the purpose of the map function? Couldn't the logic be merged into the process function?

          Thanks, Fabian

          Show
          fhueske Fabian Hueske added a comment - I think the process function should also run with parallelism 1. What's the purpose of the map function? Couldn't the logic be merged into the process function? Thanks, Fabian
          Hide
          sunjincheng121 sunjincheng added a comment -

          Fabian Hueske Yes, I just thought wrong, ProcessFunction enough.
          Tomorrow I will open this PR!
          Best,
          SunJincheng

          Show
          sunjincheng121 sunjincheng added a comment - Fabian Hueske Yes, I just thought wrong, ProcessFunction enough. Tomorrow I will open this PR! Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sunjincheng121 opened a pull request:

          https://github.com/apache/flink/pull/3491

          FLINK-5804 [table] Add support for procTime non-partitioned OVER RA…

          …NGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL.

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [×] General
          • The pull request references the related JIRA issue ("FLINK-5804 Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [×] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sunjincheng121/flink FLINK-5804-PR

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3491.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3491


          commit 73ba8bf78b4b283355c14ce5752296d9c131d478
          Author: 金竹 <jincheng.sunjc@alibaba-inc.com>
          Date: 2017-03-08T02:52:43Z

          FLINK-5804 [table] Add support for procTime non-partitioned OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3491 FLINK-5804 [table] Add support for procTime non-partitioned OVER RA… …NGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL. Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [×] General The pull request references the related JIRA issue (" FLINK-5804 Add [non-partitioned] processing time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [×] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-5804 -PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3491.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3491 commit 73ba8bf78b4b283355c14ce5752296d9c131d478 Author: 金竹 <jincheng.sunjc@alibaba-inc.com> Date: 2017-03-08T02:52:43Z FLINK-5804 [table] Add support for procTime non-partitioned OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3491#discussion_r104874462

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala —
          @@ -143,10 +143,18 @@ class DataStreamOverAggregate(
          .name(aggOpName)
          .asInstanceOf[DataStream[Row]]
          }

          • // global non-partitioned aggregation
            + // non-partitioned aggregation
            else {
          • throw TableException(
          • "Non-partitioned processing time OVER aggregation is not supported yet.")
            + val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
            + namedAggregates,
            + inputType,
            + false)
            +
            + inputDS
            + .process(processFunction).setParallelism(1)
              • End diff –

          also set max parallelism to prevent that parallelism is increased.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3491#discussion_r104874462 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala — @@ -143,10 +143,18 @@ class DataStreamOverAggregate( .name(aggOpName) .asInstanceOf[DataStream [Row] ] } // global non-partitioned aggregation + // non-partitioned aggregation else { throw TableException( "Non-partitioned processing time OVER aggregation is not supported yet.") + val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + + inputDS + .process(processFunction).setParallelism(1) End diff – also set max parallelism to prevent that parallelism is increased.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3491#discussion_r104874829

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala —
          @@ -0,0 +1,107 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import org.apache.flink.api.common.state.

          {ListState, ListStateDescriptor}

          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.runtime.state.

          {FunctionInitializationContext, FunctionSnapshotContext}

          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function used for the aggregate in
          + * [[org.apache.flink.streaming.api.datastream.DataStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + */
          +class UnboundedNonPartitionedProcessingOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo)
          + extends ProcessFunction[Row, Row] with CheckpointedFunction{
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          +
          + private var accumulators: Row = _
          + private var output: Row = _
          + private var state: ListState[Row] = null
          +
          + override def open(config: Configuration)

          { + output = new Row(forwardedFieldCount + aggregates.length) + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + if (null == accumulators) {
          — End diff –

          Can we move this into `open()`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3491#discussion_r104874829 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala — @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state. {ListState, ListStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state. {FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function used for the aggregate in + * [ [org.apache.flink.streaming.api.datastream.DataStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + */ +class UnboundedNonPartitionedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo) + extends ProcessFunction [Row, Row] with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var accumulators: Row = _ + private var output: Row = _ + private var state: ListState [Row] = null + + override def open(config: Configuration) { + output = new Row(forwardedFieldCount + aggregates.length) + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + if (null == accumulators) { — End diff – Can we move this into `open()`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3491#discussion_r104875937

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala —
          @@ -94,15 +94,18 @@ object AggregateUtil {
          }

          /**

          • * Create an [[ProcessFunction]] to evaluate final aggregate value.
            + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final
            + * aggregate value.
            *
          • @param namedAggregates List of calls to aggregate functions and their output field names
          • @param inputType Input row type
          • * @return [[UnboundedProcessingOverProcessFunction]]
            + * @param isPartitioned It is a tag that Indicates whether the data has partitioned
              • End diff –

          `Flag to indicate whether the input is partitioned or not`

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3491#discussion_r104875937 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala — @@ -94,15 +94,18 @@ object AggregateUtil { } /** * Create an [ [ProcessFunction] ] to evaluate final aggregate value. + * Create an [ [org.apache.flink.streaming.api.functions.ProcessFunction] ] to evaluate final + * aggregate value. * @param namedAggregates List of calls to aggregate functions and their output field names @param inputType Input row type * @return [ [UnboundedProcessingOverProcessFunction] ] + * @param isPartitioned It is a tag that Indicates whether the data has partitioned End diff – `Flag to indicate whether the input is partitioned or not`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3491#discussion_r104875504

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala —
          @@ -0,0 +1,107 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import org.apache.flink.api.common.state.

          {ListState, ListStateDescriptor}

          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.runtime.state.

          {FunctionInitializationContext, FunctionSnapshotContext}

          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function used for the aggregate in
          + * [[org.apache.flink.streaming.api.datastream.DataStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + */
          +class UnboundedNonPartitionedProcessingOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo)
          + extends ProcessFunction[Row, Row] with CheckpointedFunction{
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          +
          + private var accumulators: Row = _
          + private var output: Row = _
          + private var state: ListState[Row] = null
          +
          + override def open(config: Configuration)

          { + output = new Row(forwardedFieldCount + aggregates.length) + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + if (null == accumulators) {
          + val it = state.get().iterator()
          + if (it.hasNext)

          { + accumulators = it.next() + }

          else {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + }

          + }
          + }
          +
          + var i = 0
          + while (i < forwardedFieldCount)

          { + output.setField(i, input.getField(i)) + i += 1 + }

          +
          + i = 0
          + while (i < aggregates.length)

          { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + }

          +
          + out.collect(output)
          + }
          +
          + override def snapshotState(context: FunctionSnapshotContext): Unit = {
          + state.clear()
          + if (null != accumulators)

          { + state.add(accumulators) + }

          + }
          +
          + override def initializeState(context: FunctionInitializationContext): Unit = {
          + val stateSerializer =
          + aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig)
          + val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", stateSerializer)
          — End diff –

          We can create the `ListStateDescriptor` with the `aggregationStateType` instead of a serializer.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3491#discussion_r104875504 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala — @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state. {ListState, ListStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state. {FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function used for the aggregate in + * [ [org.apache.flink.streaming.api.datastream.DataStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + */ +class UnboundedNonPartitionedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo) + extends ProcessFunction [Row, Row] with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var accumulators: Row = _ + private var output: Row = _ + private var state: ListState [Row] = null + + override def open(config: Configuration) { + output = new Row(forwardedFieldCount + aggregates.length) + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + if (null == accumulators) { + val it = state.get().iterator() + if (it.hasNext) { + accumulators = it.next() + } else { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + } + } + } + + var i = 0 + while (i < forwardedFieldCount) { + output.setField(i, input.getField(i)) + i += 1 + } + + i = 0 + while (i < aggregates.length) { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + } + + out.collect(output) + } + + override def snapshotState(context: FunctionSnapshotContext): Unit = { + state.clear() + if (null != accumulators) { + state.add(accumulators) + } + } + + override def initializeState(context: FunctionInitializationContext): Unit = { + val stateSerializer = + aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig) + val accumulatorsDescriptor = new ListStateDescriptor [Row] ("overState", stateSerializer) — End diff – We can create the `ListStateDescriptor` with the `aggregationStateType` instead of a serializer.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3491#discussion_r104879219

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala —
          @@ -0,0 +1,107 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import org.apache.flink.api.common.state.

          {ListState, ListStateDescriptor}

          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.runtime.state.

          {FunctionInitializationContext, FunctionSnapshotContext}

          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function used for the aggregate in
          + * [[org.apache.flink.streaming.api.datastream.DataStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + */
          +class UnboundedNonPartitionedProcessingOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo)
          + extends ProcessFunction[Row, Row] with CheckpointedFunction{
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          +
          + private var accumulators: Row = _
          + private var output: Row = _
          + private var state: ListState[Row] = null
          +
          + override def open(config: Configuration)

          { + output = new Row(forwardedFieldCount + aggregates.length) + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + if (null == accumulators) {
          + val it = state.get().iterator()
          + if (it.hasNext)

          { + accumulators = it.next() + }

          else {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + }

          + }
          + }
          +
          + var i = 0
          + while (i < forwardedFieldCount)

          { + output.setField(i, input.getField(i)) + i += 1 + }

          +
          + i = 0
          + while (i < aggregates.length)

          { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + }

          +
          + out.collect(output)
          + }
          +
          + override def snapshotState(context: FunctionSnapshotContext): Unit = {
          + state.clear()
          + if (null != accumulators)

          { + state.add(accumulators) + }

          + }
          +
          + override def initializeState(context: FunctionInitializationContext): Unit = {
          + val stateSerializer =
          + aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig)
          + val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", stateSerializer)
          — End diff –

          When you review FLINK-5803 you had told me that use `TypeInformation` instead of `TypeSerializer`,I had tried this, but unfortunately reported exception. So, I use TypeSerializer to create `ListStateDescriptor`. the exception info:
          ```
          Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
          at org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
          at org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
          at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
          at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
          at org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
          at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
          ```
          Do you want me fix this, and use `aggregationStateType`, If so, I glad to try.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3491#discussion_r104879219 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala — @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state. {ListState, ListStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state. {FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function used for the aggregate in + * [ [org.apache.flink.streaming.api.datastream.DataStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + */ +class UnboundedNonPartitionedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo) + extends ProcessFunction [Row, Row] with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var accumulators: Row = _ + private var output: Row = _ + private var state: ListState [Row] = null + + override def open(config: Configuration) { + output = new Row(forwardedFieldCount + aggregates.length) + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + if (null == accumulators) { + val it = state.get().iterator() + if (it.hasNext) { + accumulators = it.next() + } else { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + } + } + } + + var i = 0 + while (i < forwardedFieldCount) { + output.setField(i, input.getField(i)) + i += 1 + } + + i = 0 + while (i < aggregates.length) { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + } + + out.collect(output) + } + + override def snapshotState(context: FunctionSnapshotContext): Unit = { + state.clear() + if (null != accumulators) { + state.add(accumulators) + } + } + + override def initializeState(context: FunctionInitializationContext): Unit = { + val stateSerializer = + aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig) + val accumulatorsDescriptor = new ListStateDescriptor [Row] ("overState", stateSerializer) — End diff – When you review FLINK-5803 you had told me that use `TypeInformation` instead of `TypeSerializer`,I had tried this, but unfortunately reported exception. So, I use TypeSerializer to create `ListStateDescriptor`. the exception info: ``` Caused by: java.lang.IllegalStateException: Serializer not yet initialized. at org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169) at org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91) at org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104) at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) ``` Do you want me fix this, and use `aggregationStateType`, If so, I glad to try.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3491#discussion_r104879651

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala —
          @@ -0,0 +1,107 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import org.apache.flink.api.common.state.

          {ListState, ListStateDescriptor}

          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.runtime.state.

          {FunctionInitializationContext, FunctionSnapshotContext}

          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function used for the aggregate in
          + * [[org.apache.flink.streaming.api.datastream.DataStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + */
          +class UnboundedNonPartitionedProcessingOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo)
          + extends ProcessFunction[Row, Row] with CheckpointedFunction{
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          +
          + private var accumulators: Row = _
          + private var output: Row = _
          + private var state: ListState[Row] = null
          +
          + override def open(config: Configuration)

          { + output = new Row(forwardedFieldCount + aggregates.length) + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + if (null == accumulators) {
          + val it = state.get().iterator()
          + if (it.hasNext)

          { + accumulators = it.next() + }

          else {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + }

          + }
          + }
          +
          + var i = 0
          + while (i < forwardedFieldCount)

          { + output.setField(i, input.getField(i)) + i += 1 + }

          +
          + i = 0
          + while (i < aggregates.length)

          { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + }

          +
          + out.collect(output)
          + }
          +
          + override def snapshotState(context: FunctionSnapshotContext): Unit = {
          + state.clear()
          + if (null != accumulators)

          { + state.add(accumulators) + }

          + }
          +
          + override def initializeState(context: FunctionInitializationContext): Unit = {
          + val stateSerializer =
          + aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig)
          + val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", stateSerializer)
          — End diff –

          OK, let's keep the serializer approach. Can you open a JIRA issue and report the problem when creating the ´ListStateDescriptor` with a `TypeInformation`? Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3491#discussion_r104879651 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala — @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state. {ListState, ListStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state. {FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function used for the aggregate in + * [ [org.apache.flink.streaming.api.datastream.DataStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + */ +class UnboundedNonPartitionedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo) + extends ProcessFunction [Row, Row] with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var accumulators: Row = _ + private var output: Row = _ + private var state: ListState [Row] = null + + override def open(config: Configuration) { + output = new Row(forwardedFieldCount + aggregates.length) + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + if (null == accumulators) { + val it = state.get().iterator() + if (it.hasNext) { + accumulators = it.next() + } else { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + } + } + } + + var i = 0 + while (i < forwardedFieldCount) { + output.setField(i, input.getField(i)) + i += 1 + } + + i = 0 + while (i < aggregates.length) { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + } + + out.collect(output) + } + + override def snapshotState(context: FunctionSnapshotContext): Unit = { + state.clear() + if (null != accumulators) { + state.add(accumulators) + } + } + + override def initializeState(context: FunctionInitializationContext): Unit = { + val stateSerializer = + aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig) + val accumulatorsDescriptor = new ListStateDescriptor [Row] ("overState", stateSerializer) — End diff – OK, let's keep the serializer approach. Can you open a JIRA issue and report the problem when creating the ´ListStateDescriptor` with a `TypeInformation`? Thanks
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3491#discussion_r104883119

          — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala —
          @@ -0,0 +1,107 @@
          +/*
          + * Licensed to the Apache Software Foundation (ASF) under one
          + * or more contributor license agreements. See the NOTICE file
          + * distributed with this work for additional information
          + * regarding copyright ownership. The ASF licenses this file
          + * to you under the Apache License, Version 2.0 (the
          + * "License"); you may not use this file except in compliance
          + * with the License. You may obtain a copy of the License at
          + *
          + * http://www.apache.org/licenses/LICENSE-2.0
          + *
          + * Unless required by applicable law or agreed to in writing, software
          + * distributed under the License is distributed on an "AS IS" BASIS,
          + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
          + * See the License for the specific language governing permissions and
          + * limitations under the License.
          + */
          +package org.apache.flink.table.runtime.aggregate
          +
          +import org.apache.flink.api.common.state.

          {ListState, ListStateDescriptor}

          +import org.apache.flink.api.java.typeutils.RowTypeInfo
          +import org.apache.flink.configuration.Configuration
          +import org.apache.flink.runtime.state.

          {FunctionInitializationContext, FunctionSnapshotContext}

          +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
          +import org.apache.flink.streaming.api.functions.ProcessFunction
          +import org.apache.flink.table.functions.

          {Accumulator, AggregateFunction}

          +import org.apache.flink.types.Row
          +import org.apache.flink.util.

          {Collector, Preconditions}

          +
          +/**
          + * Process Function used for the aggregate in
          + * [[org.apache.flink.streaming.api.datastream.DataStream]]
          + *
          + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]]
          + * used for this aggregation
          + * @param aggFields the position (in the input Row) of the input value for each aggregate
          + */
          +class UnboundedNonPartitionedProcessingOverProcessFunction(
          + private val aggregates: Array[AggregateFunction[_]],
          + private val aggFields: Array[Int],
          + private val forwardedFieldCount: Int,
          + private val aggregationStateType: RowTypeInfo)
          + extends ProcessFunction[Row, Row] with CheckpointedFunction{
          +
          + Preconditions.checkNotNull(aggregates)
          + Preconditions.checkNotNull(aggFields)
          + Preconditions.checkArgument(aggregates.length == aggFields.length)
          +
          + private var accumulators: Row = _
          + private var output: Row = _
          + private var state: ListState[Row] = null
          +
          + override def open(config: Configuration)

          { + output = new Row(forwardedFieldCount + aggregates.length) + }

          +
          + override def processElement(
          + input: Row,
          + ctx: ProcessFunction[Row, Row]#Context,
          + out: Collector[Row]): Unit = {
          +
          + if (null == accumulators) {
          + val it = state.get().iterator()
          + if (it.hasNext)

          { + accumulators = it.next() + }

          else {
          + accumulators = new Row(aggregates.length)
          + var i = 0
          + while (i < aggregates.length)

          { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + }

          + }
          + }
          +
          + var i = 0
          + while (i < forwardedFieldCount)

          { + output.setField(i, input.getField(i)) + i += 1 + }

          +
          + i = 0
          + while (i < aggregates.length)

          { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + }

          +
          + out.collect(output)
          + }
          +
          + override def snapshotState(context: FunctionSnapshotContext): Unit = {
          + state.clear()
          + if (null != accumulators)

          { + state.add(accumulators) + }

          + }
          +
          + override def initializeState(context: FunctionInitializationContext): Unit = {
          + val stateSerializer =
          + aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig)
          + val accumulatorsDescriptor = new ListStateDescriptor[Row]("overState", stateSerializer)
          — End diff –

          Yes, I have created the JIRA.FLINK-5995(https://issues.apache.org/jira/browse/FLINK-5995])

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3491#discussion_r104883119 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala — @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state. {ListState, ListStateDescriptor} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state. {FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions. {Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util. {Collector, Preconditions} + +/** + * Process Function used for the aggregate in + * [ [org.apache.flink.streaming.api.datastream.DataStream] ] + * + * @param aggregates the list of all [ [org.apache.flink.table.functions.AggregateFunction] ] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + */ +class UnboundedNonPartitionedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction [_] ], + private val aggFields: Array [Int] , + private val forwardedFieldCount: Int, + private val aggregationStateType: RowTypeInfo) + extends ProcessFunction [Row, Row] with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var accumulators: Row = _ + private var output: Row = _ + private var state: ListState [Row] = null + + override def open(config: Configuration) { + output = new Row(forwardedFieldCount + aggregates.length) + } + + override def processElement( + input: Row, + ctx: ProcessFunction [Row, Row] #Context, + out: Collector [Row] ): Unit = { + + if (null == accumulators) { + val it = state.get().iterator() + if (it.hasNext) { + accumulators = it.next() + } else { + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { + accumulators.setField(i, aggregates(i).createAccumulator()) + i += 1 + } + } + } + + var i = 0 + while (i < forwardedFieldCount) { + output.setField(i, input.getField(i)) + i += 1 + } + + i = 0 + while (i < aggregates.length) { + val index = forwardedFieldCount + i + val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] + aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + output.setField(index, aggregates(i).getValue(accumulator)) + i += 1 + } + + out.collect(output) + } + + override def snapshotState(context: FunctionSnapshotContext): Unit = { + state.clear() + if (null != accumulators) { + state.add(accumulators) + } + } + + override def initializeState(context: FunctionInitializationContext): Unit = { + val stateSerializer = + aggregationStateType.createSerializer(getRuntimeContext.getExecutionConfig) + val accumulatorsDescriptor = new ListStateDescriptor [Row] ("overState", stateSerializer) — End diff – Yes, I have created the JIRA. FLINK-5995 ( https://issues.apache.org/jira/browse/FLINK-5995 ])
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on the issue:

          https://github.com/apache/flink/pull/3491

          HI, @fhueske Thanks a lot for your review. I had updated the PR according to your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3491 HI, @fhueske Thanks a lot for your review. I had updated the PR according to your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3491

          Thanks for the update @sunjincheng121.
          The PR looks good to merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3491 Thanks for the update @sunjincheng121. The PR looks good to merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3491

          merging

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3491 merging
          Hide
          fhueske Fabian Hueske added a comment -

          Implemented with 7456d78d271b217c80d46e24029c55741807e51d

          Show
          fhueske Fabian Hueske added a comment - Implemented with 7456d78d271b217c80d46e24029c55741807e51d
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3491

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3491

            People

            • Assignee:
              sunjincheng121 sunjincheng
              Reporter:
              sunjincheng121 sunjincheng
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development