Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-3899

Document window processing with Reduce/FoldFunction + WindowFunction

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.1.0
    • Fix Version/s: 1.2.0, 1.1.2
    • Component/s: Documentation, Streaming
    • Labels:
      None

      Description

      The streaming documentation does not describe how windows can be processed with FoldFunction or ReduceFunction and a subsequent WindowFunction. This combination allows for eager window aggregation (only a single element is kept in the window) and access of the Window object, e.g., to have access to the window's start and end time.

        Issue Links

          Activity

          Hide
          danielblazevski Daniel Blazevski added a comment - - edited

          Fabian Hueske

          I'm happy to take this one if that's OK – I've contributed a bit to FlinkML and have been learning/playing around with Flink DataStreams, this could be a good way to dig a bit deeper into Flink's streaming APIs (and possibly later contribute code)

          To clarify, the idea is simply to have an example with

          myWindow.fold(...).keyBy(0).window(...)
          and
          myWindow.reduce(...).keyBy(0).window(...)

          Is that correct?

          Show
          danielblazevski Daniel Blazevski added a comment - - edited Fabian Hueske I'm happy to take this one if that's OK – I've contributed a bit to FlinkML and have been learning/playing around with Flink DataStreams, this could be a good way to dig a bit deeper into Flink's streaming APIs (and possibly later contribute code) To clarify, the idea is simply to have an example with myWindow.fold(...).keyBy(0).window(...) and myWindow.reduce(...).keyBy(0).window(...) Is that correct?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Daniel Blazevski this is meant to address WindowedStream.apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) and the equivalent with a FoldFunction.

          Show
          aljoscha Aljoscha Krettek added a comment - Daniel Blazevski this is meant to address WindowedStream.apply(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) and the equivalent with a FoldFunction .
          Hide
          danielblazevski Daniel Blazevski added a comment -

          Aljoscha Krettek

          I see, would it be enough to expand on the example "WindowFunction with Incremental Aggregation" here:
          https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html

          with specific examples for MyFoldFunction, MyReduceFunction, MyWindowFunction?

          Show
          danielblazevski Daniel Blazevski added a comment - Aljoscha Krettek I see, would it be enough to expand on the example "WindowFunction with Incremental Aggregation" here: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html with specific examples for MyFoldFunction, MyReduceFunction, MyWindowFunction?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Yes, I thing that would be enough.

          Show
          aljoscha Aljoscha Krettek added a comment - Yes, I thing that would be enough.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user danielblazevski opened a pull request:

          https://github.com/apache/flink/pull/2368

          FLINK-3899 Document window processing with Reduce/FoldFunction + WindowFunction

          Added example of using Reduce/Fold + Window in docs. The examples were tested to run – I used the sensor reading class from dataArtisans/blogposts.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/danielblazevski/flink FLINK-3899

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2368.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2368


          commit 3b7466b161eecfdb7b862dafce0ed4fac2248ecd
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T22:57:18Z

          added example of fold/window and reduce/window

          commit 9dfc44f2867ca1d5f92beb625f264c0d92632784
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T22:58:19Z

          added example of fold/window and reduce/window

          commit 44553dbfc97e04e3f36eaec9a2e24a08e886e26f
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T23:07:29Z

          removed throws exception

          commit c121b51a08a0d4c73eb703fa68e17b90b8d5fc17
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T23:11:30Z

          changed String var to in window fns

          commit 747e989001651a31194a9d6f3071f57518c0c03c
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T23:14:48Z

          changed start time to end time typo

          commit 9669866440b55740eecc16250b6cfb48636e7ad7
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T23:16:29Z

          changed to typo

          commit d72eb69806f89b9089731edbb8c7d7d630e4877e
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T23:41:09Z

          added scala example

          commit a8ccb49928edf9b0b37acb67439519f82e412988
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T23:46:19Z

          format: added empty line after each class def for java

          commit 071b706f0b9d52d4232c20f68a87aa5514a0a40d
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T23:49:02Z

          changed names of iterable inputs

          commit a55305f9b57d812b81b6cd6849b658f5c17344f8
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T23:50:53Z

          changed MIN_VALUE to MinValue for Scala

          commit d404568663d375acfa2e9e48b79f775f4e2bb37c
          Author: danielblazevski <daniel.blazevski@gmail.com>
          Date: 2016-08-13T23:52:41Z

          removed semicolon in Scala example


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user danielblazevski opened a pull request: https://github.com/apache/flink/pull/2368 FLINK-3899 Document window processing with Reduce/FoldFunction + WindowFunction Added example of using Reduce/Fold + Window in docs. The examples were tested to run – I used the sensor reading class from dataArtisans/blogposts. You can merge this pull request into a Git repository by running: $ git pull https://github.com/danielblazevski/flink FLINK-3899 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2368.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2368 commit 3b7466b161eecfdb7b862dafce0ed4fac2248ecd Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T22:57:18Z added example of fold/window and reduce/window commit 9dfc44f2867ca1d5f92beb625f264c0d92632784 Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T22:58:19Z added example of fold/window and reduce/window commit 44553dbfc97e04e3f36eaec9a2e24a08e886e26f Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T23:07:29Z removed throws exception commit c121b51a08a0d4c73eb703fa68e17b90b8d5fc17 Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T23:11:30Z changed String var to in window fns commit 747e989001651a31194a9d6f3071f57518c0c03c Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T23:14:48Z changed start time to end time typo commit 9669866440b55740eecc16250b6cfb48636e7ad7 Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T23:16:29Z changed to typo commit d72eb69806f89b9089731edbb8c7d7d630e4877e Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T23:41:09Z added scala example commit a8ccb49928edf9b0b37acb67439519f82e412988 Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T23:46:19Z format: added empty line after each class def for java commit 071b706f0b9d52d4232c20f68a87aa5514a0a40d Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T23:49:02Z changed names of iterable inputs commit a55305f9b57d812b81b6cd6849b658f5c17344f8 Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T23:50:53Z changed MIN_VALUE to MinValue for Scala commit d404568663d375acfa2e9e48b79f775f4e2bb37c Author: danielblazevski <daniel.blazevski@gmail.com> Date: 2016-08-13T23:52:41Z removed semicolon in Scala example
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75893984

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long> {
              • End diff –

          Please remove double space & fix lowercase class name.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75893984 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> { End diff – Please remove double space & fix lowercase class name.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75894020

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow>

            Unknown macro: { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); + } +}

          // for reducing incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)
          .apply(new MyReduceFunction(), new MyWindowFunction());
          +
          +/* ... */
          +
          +private static class myReduceFunction implements ReduceFunction<SensorReading> {
          — End diff –

          Please remove double space & fix lowercase class name.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75894020 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> Unknown macro: { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); + } +} // for reducing incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(new MyReduceFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myReduceFunction implements ReduceFunction<SensorReading> { — End diff – Please remove double space & fix lowercase class name.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75894336

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
            +
            + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
            + out.collect(timestamps.iterator().next());

              • End diff –

          Not sure if this is a good example. The same result could be achieved by a single `FoldFunction`. How about the `FoldFunction` counts the number of records and the `WindowFunction` emits a `Tuple3` of key, end time of window, and count?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75894336 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); End diff – Not sure if this is a good example. The same result could be achieved by a single `FoldFunction`. How about the `FoldFunction` counts the number of records and the `WindowFunction` emits a `Tuple3` of key, end time of window, and count?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75895039

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow>

            Unknown macro: { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); + } +}

          // for reducing incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)
          .apply(new MyReduceFunction(), new MyWindowFunction());
          +
          +/* ... */
          +
          +private static class myReduceFunction implements ReduceFunction<SensorReading> {
          +
          + public SensorReading reduce(SensorReading s1, SensorReading s2)

          { + return s1; + }

          +}
          +
          +private static class MyWindowFunction implements WindowFunction<SensorReading, SensorReading, String, TimeWindow> {
          +
          + public void apply(String key, TimeWindow window, Iterable<SensorReading> readings, Collector<SensorReading> out) {
          + out.collect(readings.iterator().next());
          — End diff –

          Not sure if this is a good example. The same result could be achieved by a single `ReduceFunction`. How about the `ReduceFunction` searches for a `SensorReading` with a minimum value and the `WindowFunction` emits a `Tuple2` of start time of window and minimum `SensorReading`?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75895039 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> Unknown macro: { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); + } +} // for reducing incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(new MyReduceFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myReduceFunction implements ReduceFunction<SensorReading> { + + public SensorReading reduce(SensorReading s1, SensorReading s2) { + return s1; + } +} + +private static class MyWindowFunction implements WindowFunction<SensorReading, SensorReading, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<SensorReading> readings, Collector<SensorReading> out) { + out.collect(readings.iterator().next()); — End diff – Not sure if this is a good example. The same result could be achieved by a single `ReduceFunction`. How about the `ReduceFunction` searches for a `SensorReading` with a minimum value and the `WindowFunction` emits a `Tuple2` of start time of window and minimum `SensorReading`?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2368

          Hi @danielblazevski, thanks for the PR. I added a some comments and suggestions.

          Thank you, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2368 Hi @danielblazevski, thanks for the PR. I added a some comments and suggestions. Thank you, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user danielblazevski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75906787

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow>

            Unknown macro: { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); + } +}

          // for reducing incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)
          .apply(new MyReduceFunction(), new MyWindowFunction());
          +
          +/* ... */
          +
          +private static class myReduceFunction implements ReduceFunction<SensorReading> {
          — End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75906787 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> Unknown macro: { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); + } +} // for reducing incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(new MyReduceFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myReduceFunction implements ReduceFunction<SensorReading> { — End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user danielblazevski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75906795

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long> {
              • End diff –

          Done

          Show
          githubbot ASF GitHub Bot added a comment - Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75906795 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> { End diff – Done
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2368

          Thanks for the fast update @danielblazevski.
          I think you accidentally added `tools/FlinkCodyStyle.xml` to your commit.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2368 Thanks for the fast update @danielblazevski. I think you accidentally added `tools/FlinkCodyStyle.xml` to your commit.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user danielblazevski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75958831

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
            +
            + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
            + out.collect(timestamps.iterator().next());

              • End diff –

          Can the the latter also be done using a single `FoldFunction`? E.g

          ```java
          public Tuple3<String,Long, Integer> fold(Tuple3<String, Long, Integer> acc,
          SensorReading s)

          { Integer cur = acc.getField(2); return new Tuple3<String,Long, Integer>( s.sensorId(), Math.max(acc.getField(1), s.timestamp()), cur + 1); }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75958831 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); End diff – Can the the latter also be done using a single `FoldFunction`? E.g ```java public Tuple3<String,Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) { Integer cur = acc.getField(2); return new Tuple3<String,Long, Integer>( s.sensorId(), Math.max(acc.getField(1), s.timestamp()), cur + 1); } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user danielblazevski commented on the issue:

          https://github.com/apache/flink/pull/2368

          Sorry, indeed, added that when playing around that the open PR to add `FlinkCodyStyle.xml`

          Show
          githubbot ASF GitHub Bot added a comment - Github user danielblazevski commented on the issue: https://github.com/apache/flink/pull/2368 Sorry, indeed, added that when playing around that the open PR to add `FlinkCodyStyle.xml`
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75959873

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
            +
            + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
            + out.collect(timestamps.iterator().next());

              • End diff –

          `Math.max(acc.getField(1), s.timestamp())` will give you the timestamp of the last element that was added to the window (assuming they arrive in event-time). With end time of the window I meant the time stamp after which an element would be placed in the next window (for an hourly tumbling window this would be `00:59:59.999` for the window from `00:00:00:000` to `00:59:59.999`). This information is only available in a `WindowFunction` through the `TimeWindow` object.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75959873 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); End diff – `Math.max(acc.getField(1), s.timestamp())` will give you the timestamp of the last element that was added to the window (assuming they arrive in event-time). With end time of the window I meant the time stamp after which an element would be placed in the next window (for an hourly tumbling window this would be `00:59:59.999` for the window from `00:00:00:000` to `00:59:59.999`). This information is only available in a `WindowFunction` through the `TimeWindow` object.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2368

          No worries

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2368 No worries
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user danielblazevski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75961552

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
            +
            + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
            + out.collect(timestamps.iterator().next());

              • End diff –

          Oh gotcha, different notion of end time. Makes sense now.

          Show
          githubbot ASF GitHub Bot added a comment - Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75961552 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); End diff – Oh gotcha, different notion of end time. Makes sense now.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user danielblazevski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r75973261

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
            +
            + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
            + out.collect(timestamps.iterator().next());

              • End diff –

          @fhueske does this look OK for this case? If so, I'll finish things up by adding the Reduce example and add both corresponding Scala examples
          ```java
          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)
          .apply(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())

          /* ... */

          private static class MyFoldFunction implements FoldFunction<SensorReading,
          Tuple3<String, Long, Integer> > {

          public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s)

          { Integer cur = acc.getField(2); return new Tuple3<String, Long, Integer> (acc.getField(0), acc.getField(1), cur + 1); }

          }

          private static class MyWindowFunction implements WindowFunction<Tuple3<String, Long, Integer>,
          Tuple3<String, Long, Integer>, String, TimeWindow> {
          public void apply(String s,
          TimeWindow window,
          Iterable<Tuple3<String, Long, Integer>> counts,
          Collector<Tuple3<String, Long, Integer>> out)

          { out.collect(new Tuple3<String, Long, Integer>(s, window.getEnd(), counts.iterator().next().getField(2)); }

          }
          ```

          I found that I had to have the `FoldFunction` include `Tuple3` in its signature since the `WindowFunction` must be of the form `WindowFunction<ACC, ACC, K, W>` according to [here](https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java)

          Show
          githubbot ASF GitHub Bot added a comment - Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r75973261 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); End diff – @fhueske does this look OK for this case? If so, I'll finish things up by adding the Reduce example and add both corresponding Scala examples ```java // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction()) /* ... */ private static class MyFoldFunction implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > { public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) { Integer cur = acc.getField(2); return new Tuple3<String, Long, Integer> (acc.getField(0), acc.getField(1), cur + 1); } } private static class MyWindowFunction implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> { public void apply(String s, TimeWindow window, Iterable<Tuple3<String, Long, Integer>> counts, Collector<Tuple3<String, Long, Integer>> out) { out.collect(new Tuple3<String, Long, Integer>(s, window.getEnd(), counts.iterator().next().getField(2)); } } ``` I found that I had to have the `FoldFunction` include `Tuple3` in its signature since the `WindowFunction` must be of the form `WindowFunction<ACC, ACC, K, W>` according to [here] ( https://github.com/apache/flink/blob/b8299bf92d8e3dbe140dd89602699394019b783d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r76008397

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
            +
            + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
            + out.collect(timestamps.iterator().next());

              • End diff –

          The example looks good, thanks! Two minor suggestions: 1) I think we can omit setting key and time in the `FoldFunction`, 2) the `WindowFunction` could fetch the count in a separate variable. This would make the `out.collect` line a bit shorter.

          Regarding the type restriction: You discovered a bug that we would like to fix but can't until Flink 2.0.0 because we promoted the interface to be `@Public` and the API is stable in Flink 1.0 releases. IMO it makes sense to point out this accidental restriction in the documentation.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76008397 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); End diff – The example looks good, thanks! Two minor suggestions: 1) I think we can omit setting key and time in the `FoldFunction`, 2) the `WindowFunction` could fetch the count in a separate variable. This would make the `out.collect` line a bit shorter. Regarding the type restriction: You discovered a bug that we would like to fix but can't until Flink 2.0.0 because we promoted the interface to be `@Public` and the API is stable in Flink 1.0 releases. IMO it makes sense to point out this accidental restriction in the documentation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user danielblazevski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r76102678

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
            +
            + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
            + out.collect(timestamps.iterator().next());

              • End diff –

          Made the changes in the Java version and added the comments. Had some issues with the Scala version. See screenshots, the only change is really to change to the type of `Iterable` in the `WindowFunction`, which IntelliJ was saying has to have type `SensorReadng`, which is not ideal. I removed the Scala version for now.

          <img width="426" alt="screenshot 2016-08-24 13 28 12" src="https://cloud.githubusercontent.com/assets/10012612/17940967/4a025738-69ff-11e6-9354-31c2ead563d4.png">

          <img width="625" alt="screenshot 2016-08-24 13 27 51" src="https://cloud.githubusercontent.com/assets/10012612/17940972/4dd5db28-69ff-11e6-8c6a-11b1900796ad.png">

          Show
          githubbot ASF GitHub Bot added a comment - Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76102678 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); End diff – Made the changes in the Java version and added the comments. Had some issues with the Scala version. See screenshots, the only change is really to change to the type of `Iterable` in the `WindowFunction`, which IntelliJ was saying has to have type `SensorReadng`, which is not ideal. I removed the Scala version for now. <img width="426" alt="screenshot 2016-08-24 13 28 12" src="https://cloud.githubusercontent.com/assets/10012612/17940967/4a025738-69ff-11e6-9354-31c2ead563d4.png"> <img width="625" alt="screenshot 2016-08-24 13 27 51" src="https://cloud.githubusercontent.com/assets/10012612/17940972/4dd5db28-69ff-11e6-8c6a-11b1900796ad.png">
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r76115561

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
            +
            + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
            + out.collect(timestamps.iterator().next());

              • End diff –

          Thanks for the update.
          The following Scala code does not show an error in my IntelliJ:

          ```
          val readings: DataStream[SensorReading] = ???

          val result: DataStream[(String, Long, Int)] = readings
          .keyBy(_.sensorId)
          .timeWindow(Time.minutes(1), Time.seconds(10))
          .apply(
          ("", 0L, 0),
          (acc: (String, Long, Int), r: SensorReading) =>

          { ("", 0L, acc._3 + 1) }

          ,
          (k: String, w: TimeWindow, cnts: Iterable[(String, Long, Int)], out: Collector[(String, Long, Int)]) =>

          { val cnt = cnts.iterator.next() out.collect((k, w.getEnd, cnt._3)) }

          )
          ```

          Thanks, Fabian

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76115561 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); End diff – Thanks for the update. The following Scala code does not show an error in my IntelliJ: ``` val readings: DataStream [SensorReading] = ??? val result: DataStream [(String, Long, Int)] = readings .keyBy(_.sensorId) .timeWindow(Time.minutes(1), Time.seconds(10)) .apply( ("", 0L, 0), (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) } , (k: String, w: TimeWindow, cnts: Iterable [(String, Long, Int)] , out: Collector [(String, Long, Int)] ) => { val cnt = cnts.iterator.next() out.collect((k, w.getEnd, cnt._3)) } ) ``` Thanks, Fabian
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user danielblazevski commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2368#discussion_r76120063

          — Diff: docs/apis/streaming/windows.md —
          @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu
          the additional meta information that writing a `WindowFunction` provides.

          This is an example that shows how incremental aggregation functions can be combined with
          -a `WindowFunction`.
          +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the
          +ending event-time of a window of sensor readings that contain a timestamp,
          +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window
          +aggregation (only a single element is kept in the window).

          <div class="codetabs" markdown="1">
          <div data-lang="java" markdown="1">

          {% highlight java %}

          -DataStream<Tuple2<String, Long>> input = ...;
          +DataStream<SensorReading> input = ...;

          // for folding incremental computation
          input
          .keyBy(<key selector>)
          .window(<window assigner>)

          • .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction());
            + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
            +
            +/* ... */
            +
            +private static class myFoldFunction implements FoldFunction<SensorReading, Long>
            Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +}

            +
            +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> {
            +
            + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) {
            + out.collect(timestamps.iterator().next());

              • End diff –

          Ah, lol, `Int` vs `Integer`...

          Show
          githubbot ASF GitHub Bot added a comment - Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/2368#discussion_r76120063 — Diff: docs/apis/streaming/windows.md — @@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit of incremental window compu the additional meta information that writing a `WindowFunction` provides. This is an example that shows how incremental aggregation functions can be combined with -a `WindowFunction`. +a `WindowFunction`. The `FoldFunction`/`WindowFunction` example shows how to extract the +ending event-time of a window of sensor readings that contain a timestamp, +and the `ReduceFunction`/`WindowFunctions` example shows how to do eager window +aggregation (only a single element is kept in the window). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> {% highlight java %} -DataStream<Tuple2<String, Long>> input = ...; +DataStream<SensorReading> input = ...; // for folding incremental computation input .keyBy(<key selector>) .window(<window assigner>) .apply(<initial value>, new MyFoldFunction(), new MyWindowFunction()); + .apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction()); + +/* ... */ + +private static class myFoldFunction implements FoldFunction<SensorReading, Long> Unknown macro: { + + public Long fold(Long acc, SensorReading s) { + return Math.max(acc, s.timestamp()); + } +} + +private static class MyWindowFunction implements WindowFunction<Long, Long, String, TimeWindow> { + + public void apply(String key, TimeWindow window, Iterable<Long> timestamps, Collector<Long> out) { + out.collect(timestamps.iterator().next()); End diff – Ah, lol, `Int` vs `Integer`...
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user danielblazevski commented on the issue:

          https://github.com/apache/flink/pull/2368

          Pushed the Scala example

          Show
          githubbot ASF GitHub Bot added a comment - Github user danielblazevski commented on the issue: https://github.com/apache/flink/pull/2368 Pushed the Scala example
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on the issue:

          https://github.com/apache/flink/pull/2368

          Thanks for the update! The example looks good. I'll do some minor reformatting and merge the PR.

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2368 Thanks for the update! The example looks good. I'll do some minor reformatting and merge the PR.
          Hide
          fhueske Fabian Hueske added a comment -

          Fixed for 1.2 with 717fc906db6db51596320853a54ffeb92b1a591f
          Fixed for 1.1 with 717fc906db6db51596320853a54ffeb92b1a591f

          Show
          fhueske Fabian Hueske added a comment - Fixed for 1.2 with 717fc906db6db51596320853a54ffeb92b1a591f Fixed for 1.1 with 717fc906db6db51596320853a54ffeb92b1a591f
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2368

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2368

            People

            • Assignee:
              danielblazevski Daniel Blazevski
              Reporter:
              fhueske Fabian Hueske
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development