Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5720

Deprecate "Folding" in all of DataStream API

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0, 1.4.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      Folding is an operation that cannot be done incrementally in a distributed way and that also cannot be done on merging windows. Now that we have AggregatingState and aggregate operations we should deprecate folding in the APIs and deprecate FoldingState.

      I suggest to remove folding completely in Flink 2.0

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/3816

          Hi @iduanyingjie, the replacement for fold has not been implemented yet. So you have to stick with `fold()` for now.

          `fold`'s replacement will correspond to `WindowedStream.aggregate(AggregateFunction)`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3816 Hi @iduanyingjie, the replacement for fold has not been implemented yet. So you have to stick with `fold()` for now. `fold`'s replacement will correspond to `WindowedStream.aggregate(AggregateFunction)`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user iduanyingjie commented on the issue:

          https://github.com/apache/flink/pull/3816

          In this document
          https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#datastream-transformations(url)
          `keyedStream.fold("start")((str, i) =>

          { str + "-" + i }

          )`
          I want to do the same thing, what am I supposed to do and and don't use deprecated API

          Show
          githubbot ASF GitHub Bot added a comment - Github user iduanyingjie commented on the issue: https://github.com/apache/flink/pull/3816 In this document https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html#datastream-transformations (url) `keyedStream.fold("start")((str, i) => { str + "-" + i } )` I want to do the same thing, what am I supposed to do and and don't use deprecated API
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user NicoK commented on the issue:

          https://github.com/apache/flink/pull/3816

          Actually, `KeyedStream.aggregate` is protected and thus not accessible.

          You may, however, access `WindowedStream#aggregate(AggregateFunction<T,ACC,R>) ` and `AllWindowedStream#aggregate(AggregateFunction<T,ACC,R>) ` for windowed streams and use a stateful map or flatmap for the rest.

          Show
          githubbot ASF GitHub Bot added a comment - Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3816 Actually, `KeyedStream.aggregate` is protected and thus not accessible. You may, however, access `WindowedStream#aggregate(AggregateFunction<T,ACC,R>) ` and `AllWindowedStream#aggregate(AggregateFunction<T,ACC,R>) ` for windowed streams and use a stateful map or flatmap for the rest.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user wuchong commented on the issue:

          https://github.com/apache/flink/pull/3816

          Hi @caddac , you can use `AggregateFunction` and `KeyedStream.aggregate(AggregationFunction<T> aggregate)` instead of fold.

          Show
          githubbot ASF GitHub Bot added a comment - Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3816 Hi @caddac , you can use `AggregateFunction` and `KeyedStream.aggregate(AggregationFunction<T> aggregate)` instead of fold.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user caddac commented on the issue:

          https://github.com/apache/flink/pull/3816

          Sorry for commenting on this closed PR, but can you point me to documentation for what will replace fold? I'm starting a new project using 1.3 and don't want to use a deprecated API, but don't find much on what I should be replacing fold with. Thanks

          Show
          githubbot ASF GitHub Bot added a comment - Github user caddac commented on the issue: https://github.com/apache/flink/pull/3816 Sorry for commenting on this closed PR, but can you point me to documentation for what will replace fold? I'm starting a new project using 1.3 and don't want to use a deprecated API, but don't find much on what I should be replacing fold with. Thanks
          Hide
          Zentol Chesnay Schepler added a comment -

          1.3: e5adf11342337994ea9da3f50ce7b587086bf820
          1.4: 50baec6e8ec28663c5db70e0b95b0c8f78c3e3cd

          Show
          Zentol Chesnay Schepler added a comment - 1.3: e5adf11342337994ea9da3f50ce7b587086bf820 1.4: 50baec6e8ec28663c5db70e0b95b0c8f78c3e3cd
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/3816

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3816
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3816

          Cool, thanks for working on this (and the fold PR): 👍

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3816 Cool, thanks for working on this (and the fold PR): 👍
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3816

          merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3816 merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3816

          I think we got them all now. I would have missed the one in `TypeExtractor`...

          LGTM (looks good to merge) now!

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3816 I think we got them all now. I would have missed the one in `TypeExtractor`... LGTM (looks good to merge) now!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3816#discussion_r114764971

          — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala —
          @@ -183,7 +183,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]

          • Creates a new [[DataStream]] by folding the elements of this DataStream
          • using an associative fold function and an initial value. An independent
          • aggregate is kept per key.
            + *
            + * @deprecated will be removed in a future version
            */
            + @Deprecated
              • End diff –

          I've included them in the annotation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3816#discussion_r114764971 — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala — @@ -183,7 +183,10 @@ class KeyedStream [T, K] (javaStream: KeyedJavaStream [T, K] ) extends DataStream [T] Creates a new [ [DataStream] ] by folding the elements of this DataStream using an associative fold function and an initial value. An independent aggregate is kept per key. + * + * @deprecated will be removed in a future version */ + @Deprecated End diff – I've included them in the annotation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3816#discussion_r114756625

          — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala —
          @@ -183,7 +183,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]

          • Creates a new [[DataStream]] by folding the elements of this DataStream
          • using an associative fold function and an initial value. An independent
          • aggregate is kept per key.
            + *
            + * @deprecated will be removed in a future version
            */
            + @Deprecated
              • End diff –

          I don't know, actually. It seems actual (random) code on GitHub only has it in the annotation: https://github.com/ReactiveX/RxScala/blob/0.x/src/main/scala/rx/lang/scala/Observable.scala#L4876

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3816#discussion_r114756625 — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala — @@ -183,7 +183,10 @@ class KeyedStream [T, K] (javaStream: KeyedJavaStream [T, K] ) extends DataStream [T] Creates a new [ [DataStream] ] by folding the elements of this DataStream using an associative fold function and an initial value. An independent aggregate is kept per key. + * + * @deprecated will be removed in a future version */ + @Deprecated End diff – I don't know, actually. It seems actual (random) code on GitHub only has it in the annotation: https://github.com/ReactiveX/RxScala/blob/0.x/src/main/scala/rx/lang/scala/Observable.scala#L4876
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on the issue:

          https://github.com/apache/flink/pull/3816

          Man that stuff is everywhere isn't it...

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on the issue: https://github.com/apache/flink/pull/3816 Man that stuff is everywhere isn't it...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user zentol commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3816#discussion_r114751208

          — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala —
          @@ -183,7 +183,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]

          • Creates a new [[DataStream]] by folding the elements of this DataStream
          • using an associative fold function and an initial value. An independent
          • aggregate is kept per key.
            + *
            + * @deprecated will be removed in a future version
            */
            + @Deprecated
              • End diff –

          Do you propose to only have the reason in the annotation, or in both annotation and docs?

          Show
          githubbot ASF GitHub Bot added a comment - Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3816#discussion_r114751208 — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala — @@ -183,7 +183,10 @@ class KeyedStream [T, K] (javaStream: KeyedJavaStream [T, K] ) extends DataStream [T] Creates a new [ [DataStream] ] by folding the elements of this DataStream using an associative fold function and an initial value. An independent aggregate is kept per key. + * + * @deprecated will be removed in a future version */ + @Deprecated End diff – Do you propose to only have the reason in the annotation, or in both annotation and docs?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3816#discussion_r114743490

          — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala —
          @@ -183,7 +183,10 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]

          • Creates a new [[DataStream]] by folding the elements of this DataStream
          • using an associative fold function and an initial value. An independent
          • aggregate is kept per key.
            + *
            + * @deprecated will be removed in a future version
            */
            + @Deprecated
              • End diff –

          Scala has `@deprecated` where you can also specify the reason right in the annotation. (https://coderwall.com/p/k4ulyg/scala-deprecated-annotation)

          Also holds for the other instances in Scala code.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3816#discussion_r114743490 — Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala — @@ -183,7 +183,10 @@ class KeyedStream [T, K] (javaStream: KeyedJavaStream [T, K] ) extends DataStream [T] Creates a new [ [DataStream] ] by folding the elements of this DataStream using an associative fold function and an initial value. An independent aggregate is kept per key. + * + * @deprecated will be removed in a future version */ + @Deprecated End diff – Scala has `@deprecated` where you can also specify the reason right in the annotation. ( https://coderwall.com/p/k4ulyg/scala-deprecated-annotation ) Also holds for the other instances in Scala code.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          While we're on this, should we deprecate all aggregation operations on `KeyedStream`? Right now, we have fold and reduce while the newer aggregate is not even there.

          Show
          aljoscha Aljoscha Krettek added a comment - While we're on this, should we deprecate all aggregation operations on `KeyedStream`? Right now, we have fold and reduce while the newer aggregate is not even there.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user zentol opened a pull request:

          https://github.com/apache/flink/pull/3816

          FLINK-5720 Deprecate DataStream#fold()

          This PR deprecates the various `DataStream#fold()` methods and all related internal classes.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/zentol/flink 5720_deprecate_folding

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3816.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3816


          commit 40fd483992f7820f8e6f7b26cd7e95a6d5f71507
          Author: zentol <chesnay@apache.org>
          Date: 2017-05-03T13:49:03Z

          FLINK-5720 Deprecate DataStream#fold()


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3816 FLINK-5720 Deprecate DataStream#fold() This PR deprecates the various `DataStream#fold()` methods and all related internal classes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 5720_deprecate_folding Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3816.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3816 commit 40fd483992f7820f8e6f7b26cd7e95a6d5f71507 Author: zentol <chesnay@apache.org> Date: 2017-05-03T13:49:03Z FLINK-5720 Deprecate DataStream#fold()
          Hide
          StephanEwen Stephan Ewen added a comment -

          +1

          I think fold causes a lot of problems internally and blocks many optimizations for size of state and performance.

          The AggregateFunction should be able to subsume all the fold() use cases.

          Show
          StephanEwen Stephan Ewen added a comment - +1 I think fold causes a lot of problems internally and blocks many optimizations for size of state and performance. The AggregateFunction should be able to subsume all the fold() use cases.

            People

            • Assignee:
              Zentol Chesnay Schepler
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development