Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Implemented
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Issue Links

        Activity

        Hide
        rtudoran radu added a comment -

        can you please post a description of what you aim to do?
        I am curious if you target to use code generation for user define aggregates or also for others. In case you aim for others as well I would like to understand what are the advantages of moving to code generation?

        Show
        rtudoran radu added a comment - can you please post a description of what you aim to do? I am curious if you target to use code generation for user define aggregates or also for others. In case you aim for others as well I would like to understand what are the advantages of moving to code generation?
        Hide
        rtudoran radu added a comment -

        shaoxuan wang i realized i did not notify you

        Show
        rtudoran radu added a comment - shaoxuan wang i realized i did not notify you
        Hide
        githubbot ASF GitHub Bot added a comment -

        GitHub user shaoxuan-wang opened a pull request:

        https://github.com/apache/flink/pull/3676

        FLINK-6241 [table] codeGen dataStream aggregates that use ProcessFunction

        Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
        If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
        In addition to going through the list, please provide a meaningful description of your changes.

        • [x] General
        • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
        • The pull request addresses only one issue
        • Each commit in the PR has a meaningful commit message (including the JIRA id)
        • [ ] Documentation
        • Documentation has been added for new functionality
        • Old documentation affected by the pull request has been updated
        • JavaDoc for public methods has been added
        • [x] Tests & Build
        • Functionality added by the pull request is covered by tests
        • `mvn clean verify` has been executed successfully locally or a Travis build has passed

        You can merge this pull request into a Git repository by running:

        $ git pull https://github.com/shaoxuan-wang/flink F6241-submit

        Alternatively you can review and apply these changes as the patch at:

        https://github.com/apache/flink/pull/3676.patch

        To close this pull request, make a commit to your master/trunk branch
        with (at least) the following in the commit message:

        This closes #3676


        commit 11d35093714984c4e27742c4105b70e474897707
        Author: shaoxuan-wang <wshaoxuan@gmail.com>
        Date: 2017-04-05T14:34:43Z

        FLINK-6241 [table] codeGen dataStream aggregates that use ProcessFunction


        Show
        githubbot ASF GitHub Bot added a comment - GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3676 FLINK-6241 [table] codeGen dataStream aggregates that use ProcessFunction Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F6241-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3676 commit 11d35093714984c4e27742c4105b70e474897707 Author: shaoxuan-wang <wshaoxuan@gmail.com> Date: 2017-04-05T14:34:43Z FLINK-6241 [table] codeGen dataStream aggregates that use ProcessFunction
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3676#discussion_r110032955

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateHelper.scala —
        @@ -0,0 +1,73 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.runtime.aggregate
        +
        +import org.apache.flink.api.common.functions.Function
        +import org.apache.flink.types.Row
        +
        +/**
        + * Base class for Aggregate Helper Function.
        + */
        +abstract class AggregateHelper extends Function {
        +
        + /**
        + * Calculate the results from accumulators, and set the results to the output
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param output output results collected in a row
        + * @param rowOffset offset of the position (in the output row) where the accumulators
        + * starts
        + */
        + def setOutput(accumulators: Row, output: Row, rowOffset: Int)
        +
        + /**
        + * Accumulate the input values to the accumulators
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param input input values bundled in a row
        + */
        + def accumulate(accumulators: Row, input: Row)
        +
        + /**
        + * Retract the input values from the accumulators
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param input input values bundled in a row
        + */
        + def retract(accumulators: Row, input: Row)
        +
        + /**
        + * Init the accumulators, and save them to a accumulators Row.
        + *
        + * @return a row of accumulators which contains the aggregated results
        + */
        + def createAccumulator(): Row
        — End diff –

        rename to `createAccumulators()` (+s)?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3676#discussion_r110032955 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateHelper.scala — @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.functions.Function +import org.apache.flink.types.Row + +/** + * Base class for Aggregate Helper Function. + */ +abstract class AggregateHelper extends Function { + + /** + * Calculate the results from accumulators, and set the results to the output + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param output output results collected in a row + * @param rowOffset offset of the position (in the output row) where the accumulators + * starts + */ + def setOutput(accumulators: Row, output: Row, rowOffset: Int) + + /** + * Accumulate the input values to the accumulators + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param input input values bundled in a row + */ + def accumulate(accumulators: Row, input: Row) + + /** + * Retract the input values from the accumulators + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param input input values bundled in a row + */ + def retract(accumulators: Row, input: Row) + + /** + * Init the accumulators, and save them to a accumulators Row. + * + * @return a row of accumulators which contains the aggregated results + */ + def createAccumulator(): Row — End diff – rename to `createAccumulators()` (+s)?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3676#discussion_r110031845

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -239,6 +241,212 @@ class CodeGenerator(
        }

        /**
        + * Generates a [[org.apache.flink.table.runtime.aggregate.AggregateHelper]] that can be passed
        + * to Java compiler.
        + *
        + * @param name Class name of the Function. Must not be unique but has to be a valid Java
        + * class identifier.
        + * @param clazz AggregateHelper Function to be generated.
        + * @param aggFields position (in the input Row) of the input value for each aggregate
        + * @param aggregates List of all aggregate functions
        + * @param generator code generator instance
        + * @param inputType Input row type
        + *
        + * @tparam F Flink Function to be generated.
        + * @return instance of GeneratedFunction
        + */
        + def generateAggregateHelper[F <: Function](
        — End diff –

        rename to `generateAggregations`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3676#discussion_r110031845 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -239,6 +241,212 @@ class CodeGenerator( } /** + * Generates a [ [org.apache.flink.table.runtime.aggregate.AggregateHelper] ] that can be passed + * to Java compiler. + * + * @param name Class name of the Function. Must not be unique but has to be a valid Java + * class identifier. + * @param clazz AggregateHelper Function to be generated. + * @param aggFields position (in the input Row) of the input value for each aggregate + * @param aggregates List of all aggregate functions + * @param generator code generator instance + * @param inputType Input row type + * + * @tparam F Flink Function to be generated. + * @return instance of GeneratedFunction + */ + def generateAggregateHelper [F <: Function] ( — End diff – rename to `generateAggregations`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3676#discussion_r110032722

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateHelper.scala —
        @@ -0,0 +1,73 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.runtime.aggregate
        +
        +import org.apache.flink.api.common.functions.Function
        +import org.apache.flink.types.Row
        +
        +/**
        + * Base class for Aggregate Helper Function.
        + */
        +abstract class AggregateHelper extends Function {
        — End diff –

        rename to `GeneratedAggregations`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3676#discussion_r110032722 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateHelper.scala — @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.functions.Function +import org.apache.flink.types.Row + +/** + * Base class for Aggregate Helper Function. + */ +abstract class AggregateHelper extends Function { — End diff – rename to `GeneratedAggregations`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3676#discussion_r110032217

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -239,6 +241,212 @@ class CodeGenerator(
        }

        /**
        + * Generates a [[org.apache.flink.table.runtime.aggregate.AggregateHelper]] that can be passed
        + * to Java compiler.
        + *
        + * @param name Class name of the Function. Must not be unique but has to be a valid Java
        + * class identifier.
        + * @param clazz AggregateHelper Function to be generated.
        + * @param aggFields position (in the input Row) of the input value for each aggregate
        + * @param aggregates List of all aggregate functions
        + * @param generator code generator instance
        + * @param inputType Input row type
        + *
        + * @tparam F Flink Function to be generated.
        + * @return instance of GeneratedFunction
        + */
        + def generateAggregateHelper[F <: Function](
        + name: String,
        + clazz: Class[F],
        — End diff –

        I don't think we need the `Class` parameter

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3676#discussion_r110032217 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -239,6 +241,212 @@ class CodeGenerator( } /** + * Generates a [ [org.apache.flink.table.runtime.aggregate.AggregateHelper] ] that can be passed + * to Java compiler. + * + * @param name Class name of the Function. Must not be unique but has to be a valid Java + * class identifier. + * @param clazz AggregateHelper Function to be generated. + * @param aggFields position (in the input Row) of the input value for each aggregate + * @param aggregates List of all aggregate functions + * @param generator code generator instance + * @param inputType Input row type + * + * @tparam F Flink Function to be generated. + * @return instance of GeneratedFunction + */ + def generateAggregateHelper [F <: Function] ( + name: String, + clazz: Class [F] , — End diff – I don't think we need the `Class` parameter
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3676#discussion_r110032463

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala —
        @@ -58,6 +58,16 @@ case class GeneratedFunction[F <: Function, T <: Any](
        code: String)

        /**
        + * Describes a generated aggregate helper function
        + *
        + * @param name class name of the generated Function.
        + * @param code code of the generated Function.
        + */
        +case class AggregateHelperFunction(
        — End diff –

        rename to `GeneratedAggregationsFunction`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3676#discussion_r110032463 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala — @@ -58,6 +58,16 @@ case class GeneratedFunction [F <: Function, T <: Any] ( code: String) /** + * Describes a generated aggregate helper function + * + * @param name class name of the generated Function. + * @param code code of the generated Function. + */ +case class AggregateHelperFunction( — End diff – rename to `GeneratedAggregationsFunction`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3676#discussion_r110032776

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateHelper.scala —
        @@ -0,0 +1,73 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.runtime.aggregate
        +
        +import org.apache.flink.api.common.functions.Function
        +import org.apache.flink.types.Row
        +
        +/**
        + * Base class for Aggregate Helper Function.
        + */
        +abstract class AggregateHelper extends Function {
        +
        + /**
        + * Calculate the results from accumulators, and set the results to the output
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param output output results collected in a row
        + * @param rowOffset offset of the position (in the output row) where the accumulators
        + * starts
        + */
        + def setOutput(accumulators: Row, output: Row, rowOffset: Int)
        — End diff –

        rename to `setAggregationResults()`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3676#discussion_r110032776 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateHelper.scala — @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.functions.Function +import org.apache.flink.types.Row + +/** + * Base class for Aggregate Helper Function. + */ +abstract class AggregateHelper extends Function { + + /** + * Calculate the results from accumulators, and set the results to the output + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param output output results collected in a row + * @param rowOffset offset of the position (in the output row) where the accumulators + * starts + */ + def setOutput(accumulators: Row, output: Row, rowOffset: Int) — End diff – rename to `setAggregationResults()`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3676#discussion_r110032835

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateHelper.scala —
        @@ -0,0 +1,73 @@
        +/*
        + * Licensed to the Apache Software Foundation (ASF) under one
        + * or more contributor license agreements. See the NOTICE file
        + * distributed with this work for additional information
        + * regarding copyright ownership. The ASF licenses this file
        + * to you under the Apache License, Version 2.0 (the
        + * "License"); you may not use this file except in compliance
        + * with the License. You may obtain a copy of the License at
        + *
        + * http://www.apache.org/licenses/LICENSE-2.0
        + *
        + * Unless required by applicable law or agreed to in writing, software
        + * distributed under the License is distributed on an "AS IS" BASIS,
        + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        + * See the License for the specific language governing permissions and
        + * limitations under the License.
        + */
        +
        +package org.apache.flink.table.runtime.aggregate
        +
        +import org.apache.flink.api.common.functions.Function
        +import org.apache.flink.types.Row
        +
        +/**
        + * Base class for Aggregate Helper Function.
        + */
        +abstract class AggregateHelper extends Function {
        +
        + /**
        + * Calculate the results from accumulators, and set the results to the output
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param output output results collected in a row
        + * @param rowOffset offset of the position (in the output row) where the accumulators
        + * starts
        + */
        + def setOutput(accumulators: Row, output: Row, rowOffset: Int)
        +
        + /**
        + * Accumulate the input values to the accumulators
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param input input values bundled in a row
        + */
        + def accumulate(accumulators: Row, input: Row)
        +
        + /**
        + * Retract the input values from the accumulators
        + *
        + * @param accumulators the accumulators (saved in a row) which contains the current
        + * aggregated results
        + * @param input input values bundled in a row
        + */
        + def retract(accumulators: Row, input: Row)
        +
        + /**
        + * Init the accumulators, and save them to a accumulators Row.
        + *
        + * @return a row of accumulators which contains the aggregated results
        + */
        + def createAccumulator(): Row
        +
        + /**
        + * Init the accumulators, and save them to the input accumulators Row.
        + *
        + * @param input input values bundled in a row
        + * @param output output results collected in a row
        + */
        + def forwardValueToOutput(input: Row, output: Row)
        — End diff –

        rename to `setForwardedFields()`?

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3676#discussion_r110032835 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateHelper.scala — @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.functions.Function +import org.apache.flink.types.Row + +/** + * Base class for Aggregate Helper Function. + */ +abstract class AggregateHelper extends Function { + + /** + * Calculate the results from accumulators, and set the results to the output + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param output output results collected in a row + * @param rowOffset offset of the position (in the output row) where the accumulators + * starts + */ + def setOutput(accumulators: Row, output: Row, rowOffset: Int) + + /** + * Accumulate the input values to the accumulators + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param input input values bundled in a row + */ + def accumulate(accumulators: Row, input: Row) + + /** + * Retract the input values from the accumulators + * + * @param accumulators the accumulators (saved in a row) which contains the current + * aggregated results + * @param input input values bundled in a row + */ + def retract(accumulators: Row, input: Row) + + /** + * Init the accumulators, and save them to a accumulators Row. + * + * @return a row of accumulators which contains the aggregated results + */ + def createAccumulator(): Row + + /** + * Init the accumulators, and save them to the input accumulators Row. + * + * @param input input values bundled in a row + * @param output output results collected in a row + */ + def forwardValueToOutput(input: Row, output: Row) — End diff – rename to `setForwardedFields()`?
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user fhueske commented on a diff in the pull request:

        https://github.com/apache/flink/pull/3676#discussion_r110032272

        — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala —
        @@ -239,6 +241,212 @@ class CodeGenerator(
        }

        /**
        + * Generates a [[org.apache.flink.table.runtime.aggregate.AggregateHelper]] that can be passed
        + * to Java compiler.
        + *
        + * @param name Class name of the Function. Must not be unique but has to be a valid Java
        + * class identifier.
        + * @param clazz AggregateHelper Function to be generated.
        + * @param aggFields position (in the input Row) of the input value for each aggregate
        + * @param aggregates List of all aggregate functions
        + * @param generator code generator instance
        + * @param inputType Input row type
        + *
        + * @tparam F Flink Function to be generated.
        + * @return instance of GeneratedFunction
        + */
        + def generateAggregateHelper[F <: Function](
        — End diff –

        The generic type can be removed

        Show
        githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3676#discussion_r110032272 — Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala — @@ -239,6 +241,212 @@ class CodeGenerator( } /** + * Generates a [ [org.apache.flink.table.runtime.aggregate.AggregateHelper] ] that can be passed + * to Java compiler. + * + * @param name Class name of the Function. Must not be unique but has to be a valid Java + * class identifier. + * @param clazz AggregateHelper Function to be generated. + * @param aggFields position (in the input Row) of the input value for each aggregate + * @param aggregates List of all aggregate functions + * @param generator code generator instance + * @param inputType Input row type + * + * @tparam F Flink Function to be generated. + * @return instance of GeneratedFunction + */ + def generateAggregateHelper [F <: Function] ( — End diff – The generic type can be removed
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user shaoxuan-wang commented on the issue:

        https://github.com/apache/flink/pull/3676

        Thanks @fhueske, these changes look good to me.

        Show
        githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3676 Thanks @fhueske, these changes look good to me.
        Hide
        githubbot ASF GitHub Bot added a comment -

        Github user asfgit closed the pull request at:

        https://github.com/apache/flink/pull/3676

        Show
        githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3676
        Hide
        fhueske Fabian Hueske added a comment -

        Implemented with 89d9dec388ccd8d7d9860d7b550f29a083d2605d

        Show
        fhueske Fabian Hueske added a comment - Implemented with 89d9dec388ccd8d7d9860d7b550f29a083d2605d

          People

          • Assignee:
            ShaoxuanWang Shaoxuan Wang
            Reporter:
            ShaoxuanWang Shaoxuan Wang
          • Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved:

              Development