Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5984

Add resetAccumulator method for AggregateFunction

    Details

    • Type: Improvement
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: Table API & SQL
    • Labels:
      None

      Description

      Right now we have to create a new accumulator object if we just want to reset it. We should allow passing the old one as a reuse object to AggregateFunction#createAccumulator. The aggregate function then can decide if it wants to create a new object or reset the old one.

        Issue Links

          Activity

          Hide
          fhueske Fabian Hueske added a comment -

          Yes, I think that would be a good idea. Many function objects can make the accumulators member variables and hold them across invocations. Instead of creating a new in each function invocation, the member variables can be reused.
          For the SumAggFunction it would look like this:

          override def createAccumulator(reuse: Accumulator): Accumulator = {
            val acc = if (reuse == null) {
              new SumAccumulator[T]()
            } else {
              reuse
            }
            acc.f0 = numeric.zero //sum
            acc.f1 = false
            acc
          }
          
          Show
          fhueske Fabian Hueske added a comment - Yes, I think that would be a good idea. Many function objects can make the accumulators member variables and hold them across invocations. Instead of creating a new in each function invocation, the member variables can be reused. For the SumAggFunction it would look like this: override def createAccumulator(reuse: Accumulator): Accumulator = { val acc = if (reuse == null ) { new SumAccumulator[T]() } else { reuse } acc.f0 = numeric.zero //sum acc.f1 = false acc }
          Hide
          ShaoxuanWang Shaoxuan Wang added a comment -

          Timo Walther Thanks for the suggestion. This seems a new "resetAccumulator" method to me. It is valuable for dataset, while dataStream does not need this for now.
          override def resetAccumulator(acc: Accumulator) =

          { val a = acc.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.zero //sum a.f1 = false }

          Fabian's proposal also looks good to me, as long as we make the "reuse parameter" purely clear to the users with detailed annotations.
          Timo Walther, if you have not started on this jira, I can help to make the changes.

          Show
          ShaoxuanWang Shaoxuan Wang added a comment - Timo Walther Thanks for the suggestion. This seems a new "resetAccumulator" method to me. It is valuable for dataset, while dataStream does not need this for now. override def resetAccumulator(acc: Accumulator) = { val a = acc.asInstanceOf[SumAccumulator[T]] a.f0 = numeric.zero //sum a.f1 = false } Fabian's proposal also looks good to me, as long as we make the "reuse parameter" purely clear to the users with detailed annotations. Timo Walther , if you have not started on this jira, I can help to make the changes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user shaoxuan-wang opened a pull request:

          https://github.com/apache/flink/pull/3496

          FLINK-5984 [table] add resetAccumulator method for AggregateFunction

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [x] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [x] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/shaoxuan-wang/flink F5984-submit

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3496.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3496


          commit d60ef3e67736c6cf366a486d0c57b13874c381bd
          Author: shaoxuan-wang <wshaoxuan@gmail.com>
          Date: 2017-03-08T15:10:28Z

          FLINK-5984 [table] add resetAccumulator method for AggregateFunction


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user shaoxuan-wang opened a pull request: https://github.com/apache/flink/pull/3496 FLINK-5984 [table] add resetAccumulator method for AggregateFunction Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [x] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [x] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/shaoxuan-wang/flink F5984-submit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3496.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3496 commit d60ef3e67736c6cf366a486d0c57b13874c381bd Author: shaoxuan-wang <wshaoxuan@gmail.com> Date: 2017-03-08T15:10:28Z FLINK-5984 [table] add resetAccumulator method for AggregateFunction
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user shaoxuan-wang commented on the issue:

          https://github.com/apache/flink/pull/3496

          @fhueske , do you have plan to merge FLINK-5983 and FLINK-5963 very soon? This PR probably need rebase once your two PRs are merged, as changes are overlapped.

          Show
          githubbot ASF GitHub Bot added a comment - Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3496 @fhueske , do you have plan to merge FLINK-5983 and FLINK-5963 very soon? This PR probably need rebase once your two PRs are merged, as changes are overlapped.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3496

          Thanks @shaoxuan-wang. The PR looks good.
          FLINK-5983 has been merged and FLINK-5963 will be merged soon.
          It would be great if you could rebase your code on the master once that was done.

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3496 Thanks @shaoxuan-wang. The PR looks good. FLINK-5983 has been merged and FLINK-5963 will be merged soon. It would be great if you could rebase your code on the master once that was done. Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user twalthr commented on the issue:

          https://github.com/apache/flink/pull/3496

          @fhueske and @shaoxuan-wang. I will rebase and merge this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3496 @fhueske and @shaoxuan-wang. I will rebase and merge this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3496

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3496
          Hide
          twalthr Timo Walther added a comment -

          Fixed in 1.3.0: 65ccf7cdfa9a52ccc7edcab3615b147f8dafdee7

          Show
          twalthr Timo Walther added a comment - Fixed in 1.3.0: 65ccf7cdfa9a52ccc7edcab3615b147f8dafdee7

            People

            • Assignee:
              ShaoxuanWang Shaoxuan Wang
              Reporter:
              twalthr Timo Walther
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development