Details

    • Type: Sub-task
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.2.0, 1.3.0
    • Component/s: Documentation
    • Labels:
      None

      Description

      Suggested Outline:

      Windows
      
      (0) Outline: The anatomy of a window operation
      
        stream
           [.keyBy(...)]         <-  keyed versus non-keyed windows
            .window(...)         <-  required: "assigner"
           [.trigger(...)]       <-  optional: "trigger" (else default trigger)
           [.evictor(...)]       <-  optional: "evictor" (else no evictor)
           [.allowedLateness()]  <-  optional, else zero
            .reduce/fold/apply() <-  required: "function"
      
      (1) Types of windows
      
        - tumble
        - slide
        - session
        - global
      
      (2) Pre-defined windows
      
         timeWindow() (tumble, slide)
         countWindow() (tumble, slide)
           - mention that count windows are inherently
             resource leaky unless limited key space
      
      (3) Window Functions
      
        - apply: most basic, iterates over elements in window
        
        - aggregating: reduce and fold, can be used with "apply()" which will get one element
        
        - forward reference to state size section
      
      (4) Advanced Windows
      
        - assigner
          - simple
          - merging
        - trigger
          - registering timers (processing time, event time)
          - state in triggers
        - life cycle of a window
          - create
          - state
          - cleanup
            - when is window contents purged
            - when is state dropped
            - when is metadata (like merging set) dropped
      
      (5) Late data
        - picture
        - fire vs fire_and_purge: late accumulates vs late resurrects (cf discarding mode)
        
      (6) Evictors
        - TDB
        
      (7) State size: How large will the state be?
      
      Basic rule: Each element has one copy per window it is assigned to
        --> num windows * num elements in window
        --> example: tumbline is one copy, sliding(n,m) is n/m copies
        --> per key
      
      Pre-aggregation:
        - if reduce or fold is set -> one element per window (rather than num elements in window)
        - evictor voids pre-aggregation from the perspective of state
      
      Special rules:
        - fold cannot pre-aggregate on session windows (and other merging windows)
      
      
      (8) Non-keyed windows
        - all elements through the same windows
        - currently not parallel
        - possible parallel in the future when having pre-aggregation functions
        - inherently (by definition) produce a result stream with parallelism one
        - state similar to one key of keyed windows
      

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3191

          FLINK-5529 FLINK-4752 [docs] Improve / extends windowing documentation

          This PR is for both the issues in the title.
          It refactors/improves/extends the documentation of the windowing logic in Flink 1.2.

          R @aljoscha

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink window-docs

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3191.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3191


          commit 7ee13d40983831c4ea5db0459ef691c8b974bc6e
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-01-17T15:51:09Z

          FLINK-5529 [docs] Improve / extends windowing documentation

          commit 6e840fdc2d0e92715488a14e48031c44206254c9
          Author: Fabian Hueske <fhueske@apache.org>
          Date: 2017-01-18T17:57:23Z

          FLINK-4752 [docs] Improve window assigner documentation.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3191 FLINK-5529 FLINK-4752 [docs] Improve / extends windowing documentation This PR is for both the issues in the title. It refactors/improves/extends the documentation of the windowing logic in Flink 1.2. R @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink window-docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3191.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3191 commit 7ee13d40983831c4ea5db0459ef691c8b974bc6e Author: kl0u <kkloudas@gmail.com> Date: 2017-01-17T15:51:09Z FLINK-5529 [docs] Improve / extends windowing documentation commit 6e840fdc2d0e92715488a14e48031c44206254c9 Author: Fabian Hueske <fhueske@apache.org> Date: 2017-01-18T17:57:23Z FLINK-4752 [docs] Improve window assigner documentation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97822798

          — Diff: docs/dev/windows.md —
          @@ -278,38 +343,39 @@ input

          {% highlight scala %}

          val input: DataStream[T] = ...

          -// tumbling event-time windows
          input
          .keyBy(<key selector>)

          • .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
            + .window(GlobalWindows.create())
            .<windowed transformation>(<window function>) {% endhighlight %}

            </div>
            </div>

              1. Window Functions
                +## Window Function
              • End diff –

          All the other section headings for windowing components are pluralised, this one ist not. We should keep it consistent and pluralise all or none.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97822798 — Diff: docs/dev/windows.md — @@ -278,38 +343,39 @@ input {% highlight scala %} val input: DataStream [T] = ... -// tumbling event-time windows input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) + .window(GlobalWindows.create()) .<windowed transformation>(<window function>) {% endhighlight %} </div> </div> Window Functions +## Window Function End diff – All the other section headings for windowing components are pluralised, this one ist not. We should keep it consistent and pluralise all or none.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97824676

          — Diff: docs/dev/windows.md —
          @@ -23,133 +23,96 @@ specific language governing permissions and limitations
          under the License.
          -->

          -Flink uses a concept called windows to divide a (potentially) infinite `DataStream` into finite
          -slices based on the timestamps of elements or other criteria. This division is required when working
          -with infinite streams of data and performing transformations that aggregate elements.
          -
          -<span class="label label-info">Info</span> We will mostly talk about keyed windowing here, i.e.
          -windows that are applied on a `KeyedStream`. Keyed windows have the advantage that elements are
          -subdivided based on both window and key before being given to
          -a user function. The work can thus be distributed across the cluster
          -because the elements for different keys can be processed independently. If you absolutely have to,
          -you can check out [non-keyed windowing](#non-keyed-windowing) where we describe how non-keyed
          -windows work.
          +Windows are at the heart of processing infinite streams. Windows split the stream into "buckets" of finite size,
          +over which we can apply computations. This document focuses on how windowing is performed in Flink and how the
          +programmer can benefit to the maximum from its offered functionality.

            • This will be replaced by the TOC
              -{:toc}
              +The general structure of a windowed Flink program is presented below. This is also going to serve as a roadmap for
              +the rest of the page.
              1. Basics
                + stream
                + .keyBy(...) <- keyed versus non-keyed windows
                + .window(...) <- required: "assigner"
                + [.trigger(...)] <- optional: "trigger" (else default trigger)
                + [.evictor(...)] <- optional: "evictor" (else no evictor)
                + [.allowedLateness()] <- optional, else zero
                + .reduce/fold/apply() <- required: "function"

          -For a windowed transformation you must at least specify a key
          -(see [specifying keys]({{ site.baseurl }}/dev/api_concepts.html#specifying-keys)),
          -a window assigner and a window function. The key divides the infinite, non-keyed, stream
          -into logical keyed streams while the window assigner assigns elements to finite per-key windows.
          -Finally, the window function is used to process the elements of each window.
          +In the above, the commands in square brackets ([...]) are optional. This reveals that Flink allows you to customize your
          +windowing logic in many different ways so that it best fits your needs.

          -The basic structure of a windowed transformation is thus as follows:
          +* This will be replaced by the TOC
          +{:toc}

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -

          {% highlight java %}

          -DataStream<T> input = ...;
          +## Window Lifecycle

          -input

          • .keyBy(<key selector>)
          • .window(<window assigner>)
          • .<windowed transformation>(<window function>);
            - {% endhighlight %}
            -</div>
            +In a nutshell, a window is *created* as soon as the first element that should belong to this window arrives, and the
            +window is *completely removed* when the time (event or processing time) passes its end timestamp plus the user-specified
            +`allowed lateness` (see [Allowed Lateness](#allowed-lateness)). Flink guarantees removal only for time-based
            +windows and not for other types, e.g. global windows (see [Window Assigners](#window-assigners)). For example, with an
            +event-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowed
            +lateness of 1 min, Flink will create a new window for the interval between `12:00` and `12:05` when the first element with
            +a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the `12:06`
            +timestamp.

            -<div data-lang="scala" markdown="1">
            -{% highlight scala %}
            -val input: DataStream[T] = ...
            +In addition, each window will have a `Trigger` (see [Triggers](#triggers)) and a function (`WindowFunction`, `ReduceFunction` or
            +`FoldFunction`) (see [Window Functions](#window-functions)) attached to it. The function will contain the computation to
            +be applied to the contents of the window, while the `Trigger` specifies the conditions under which the window is
            +considered ready for the function to be applied. A triggering policy might be something like "when the number of elements
            +in the window is more than 4", or "when the watermark passes the end of the window". A trigger can also decide to
            +purge a window's contents any time between its creation and removal. Purging in this case only refers to the elements
            +in the window, and not the window metadata. This means that new data can still be added to that window.

            -input
            - .keyBy(<key selector>)
            - .window(<window assigner>)
            - .<windowed transformation>(<window function>)
            -{% endhighlight %}

            -</div>
            -</div>
            +Apart from the above, you can specify an `Evictor` (see [Evictors](#evictors)) which will be able to remove
            +elements from the window after the trigger fires and before and/or after the function is applied.

          -We will cover [window assigners](#window-assigners) in a separate section below.
          +In the following we go into more detail for each of the components above. We start with the required parts in the above
          +snippet (see [Keyed vs Non-Keyed Windows](#keyed-vs-non-keyed-windows), [Window Assigner](#window-assigner), and
          +[Window Function](#window-function)) before moving to the optional ones.

          -The window transformation can be one of `reduce()`, `fold()` or `apply()`. Which respectively
          -takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe each of these ways
          -of specifying a windowed transformation in detail below: [window functions](#window-functions).
          +## Keyed vs Non-Keyed Windows

          -For more advanced use cases you can also specify a `Trigger` that determines when exactly a window
          -is being considered as ready for processing. These will be covered in more detail in
          -[triggers](#triggers).
          +The first thing to specify is whether your stream should be keyed or not. This has to be done before defining the window.
          +Using the `keyBy(...)` will split your infinite stream into logical keyed streams. If `keyBy(...)` is not called, your
          +stream is not keyed.

              1. Window Assigners
                +In the case of keyed streams, any attribute of your incoming events can be used as a key
                +(more details [here]({{ site.baseurl }}/dev/api_concepts.html#specifying-keys)). Having a keyed stream will
                +allow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed
                +independently from the rest. All elements referring to the same key will be sent to the same parallel task.

          -The window assigner specifies how elements of the stream are divided into finite slices. Flink comes
          -with pre-implemented window assigners for the most typical use cases, namely tumbling windows,
          -sliding windows, session windows and global windows, but you can implement your own by
          -extending the `WindowAssigner` class. All the built-in window assigners, except for the global
          -windows one, assign elements to windows based on time, which can either be processing time or event
          -time. Please take a look at our section on [event time]({{ site.baseurl }}/dev/event_time.html) for more
          -information about how Flink deals with time.
          +In case of non-keyed streams, your original stream will not be split into multiple logical streams and all the windowing logic
          +will be performed by a single task, i.e. with parallelism of 1.

          -Let's first look at how each of these window assigners works before looking at how they can be used
          -in a Flink program. We will be using abstract figures to visualize the workings of each assigner:
          -in the following, the purple circles are elements of the stream, they are partitioned
          -by some key (in this case user 1, user 2 and user 3) and the x-axis shows the progress
          -of time.
          +## Window Assigners

                1. Global Windows
                  +After specifying whether your stream is keyed or not, the next step is to define a windowing strategy.
              • End diff –

          I think we should stick to `window assigner` here because that's what we're describing. In my mind, the ensemble of window assigner, trigger (and evictor) is actually the `windowing strategy` since only those together define what happens in the end.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97824676 — Diff: docs/dev/windows.md — @@ -23,133 +23,96 @@ specific language governing permissions and limitations under the License. --> -Flink uses a concept called windows to divide a (potentially) infinite `DataStream` into finite -slices based on the timestamps of elements or other criteria. This division is required when working -with infinite streams of data and performing transformations that aggregate elements. - -<span class="label label-info">Info</span> We will mostly talk about keyed windowing here, i.e. -windows that are applied on a `KeyedStream`. Keyed windows have the advantage that elements are -subdivided based on both window and key before being given to -a user function. The work can thus be distributed across the cluster -because the elements for different keys can be processed independently. If you absolutely have to, -you can check out [non-keyed windowing] (#non-keyed-windowing) where we describe how non-keyed -windows work. +Windows are at the heart of processing infinite streams. Windows split the stream into "buckets" of finite size, +over which we can apply computations. This document focuses on how windowing is performed in Flink and how the +programmer can benefit to the maximum from its offered functionality. This will be replaced by the TOC -{:toc} +The general structure of a windowed Flink program is presented below. This is also going to serve as a roadmap for +the rest of the page. Basics + stream + .keyBy(...) <- keyed versus non-keyed windows + .window(...) <- required: "assigner" + [.trigger(...)] <- optional: "trigger" (else default trigger) + [.evictor(...)] <- optional: "evictor" (else no evictor) + [.allowedLateness()] <- optional, else zero + .reduce/fold/apply() <- required: "function" -For a windowed transformation you must at least specify a key -(see [specifying keys] ({{ site.baseurl }}/dev/api_concepts.html#specifying-keys)), -a window assigner and a window function . The key divides the infinite, non-keyed, stream -into logical keyed streams while the window assigner assigns elements to finite per-key windows. -Finally, the window function is used to process the elements of each window. +In the above, the commands in square brackets ( [...] ) are optional. This reveals that Flink allows you to customize your +windowing logic in many different ways so that it best fits your needs. -The basic structure of a windowed transformation is thus as follows: +* This will be replaced by the TOC +{:toc} -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> - {% highlight java %} -DataStream<T> input = ...; +## Window Lifecycle -input .keyBy(<key selector>) .window(<window assigner>) .<windowed transformation>(<window function>); - {% endhighlight %} -</div> +In a nutshell, a window is * created * as soon as the first element that should belong to this window arrives, and the +window is * completely removed * when the time (event or processing time) passes its end timestamp plus the user-specified +`allowed lateness` (see [Allowed Lateness] (#allowed-lateness)). Flink guarantees removal only for time-based +windows and not for other types, e.g. global windows (see [Window Assigners] (#window-assigners)). For example, with an +event-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowed +lateness of 1 min, Flink will create a new window for the interval between `12:00` and `12:05` when the first element with +a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the `12:06` +timestamp. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream [T] = ... +In addition, each window will have a `Trigger` (see [Triggers] (#triggers)) and a function (`WindowFunction`, `ReduceFunction` or +`FoldFunction`) (see [Window Functions] (#window-functions)) attached to it. The function will contain the computation to +be applied to the contents of the window, while the `Trigger` specifies the conditions under which the window is +considered ready for the function to be applied. A triggering policy might be something like "when the number of elements +in the window is more than 4", or "when the watermark passes the end of the window". A trigger can also decide to +purge a window's contents any time between its creation and removal. Purging in this case only refers to the elements +in the window, and not the window metadata. This means that new data can still be added to that window. -input - .keyBy(<key selector>) - .window(<window assigner>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> +Apart from the above, you can specify an `Evictor` (see [Evictors] (#evictors)) which will be able to remove +elements from the window after the trigger fires and before and/or after the function is applied. -We will cover [window assigners] (#window-assigners) in a separate section below. +In the following we go into more detail for each of the components above. We start with the required parts in the above +snippet (see [Keyed vs Non-Keyed Windows] (#keyed-vs-non-keyed-windows), [Window Assigner] (#window-assigner), and + [Window Function] (#window-function)) before moving to the optional ones. -The window transformation can be one of `reduce()`, `fold()` or `apply()`. Which respectively -takes a `ReduceFunction`, `FoldFunction` or `WindowFunction`. We describe each of these ways -of specifying a windowed transformation in detail below: [window functions] (#window-functions). +## Keyed vs Non-Keyed Windows -For more advanced use cases you can also specify a `Trigger` that determines when exactly a window -is being considered as ready for processing . These will be covered in more detail in - [triggers] (#triggers). +The first thing to specify is whether your stream should be keyed or not. This has to be done before defining the window. +Using the `keyBy(...)` will split your infinite stream into logical keyed streams. If `keyBy(...)` is not called, your +stream is not keyed. Window Assigners +In the case of keyed streams, any attribute of your incoming events can be used as a key +(more details [here] ({{ site.baseurl }}/dev/api_concepts.html#specifying-keys)). Having a keyed stream will +allow your windowed computation to be performed in parallel by multiple tasks, as each logical keyed stream can be processed +independently from the rest. All elements referring to the same key will be sent to the same parallel task. -The window assigner specifies how elements of the stream are divided into finite slices. Flink comes -with pre-implemented window assigners for the most typical use cases, namely tumbling windows , - sliding windows , session windows and global windows , but you can implement your own by -extending the `WindowAssigner` class. All the built-in window assigners, except for the global -windows one, assign elements to windows based on time, which can either be processing time or event -time. Please take a look at our section on [event time] ({{ site.baseurl }}/dev/event_time.html) for more -information about how Flink deals with time. +In case of non-keyed streams, your original stream will not be split into multiple logical streams and all the windowing logic +will be performed by a single task, i.e. with parallelism of 1. -Let's first look at how each of these window assigners works before looking at how they can be used -in a Flink program. We will be using abstract figures to visualize the workings of each assigner: -in the following, the purple circles are elements of the stream, they are partitioned -by some key (in this case user 1 , user 2 and user 3 ) and the x-axis shows the progress -of time. +## Window Assigners Global Windows +After specifying whether your stream is keyed or not, the next step is to define a windowing strategy . End diff – I think we should stick to `window assigner` here because that's what we're describing. In my mind, the ensemble of window assigner, trigger (and evictor) is actually the `windowing strategy` since only those together define what happens in the end. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97823809

          — Diff: docs/dev/windows.md —
          @@ -204,72 +221,120 @@ input

          {% highlight scala %}
          val input: DataStream[T] = ...

          -// tumbling event-time windows
          -input
          - .keyBy(<key selector>)
          - .window(TumblingEventTimeWindows.of(Time.seconds(5)))
          - .<windowed transformation>(<window function>)
          -
          // sliding event-time windows
          input
          .keyBy(<key selector>)
          .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
          .<windowed transformation>(<window function>)

          -// event-time session windows
          +// sliding processing-time windows
          input
          .keyBy(<key selector>)
          - .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
          + .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
          .<windowed transformation>(<window function>)

          -// tumbling processing-time windows
          +// sliding processing-time windows offset by -8 hours
          input
          .keyBy(<key selector>)
          - .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
          + .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
          .<windowed transformation>(<window function>)
          +{% endhighlight %}
          +</div>
          +</div>

          -// sliding processing-time windows
          +Time intervals can be specified by using one of `Time.milliseconds`, `Time.seconds`,
          +`Time.minutes`, and so on.
          +
          +As shown in the last example, sliding window assigners also take an optional `offset` parameter
          +that can be used to change the alignment of windows. For example, without offsets hourly windows
          +sliding by 30 minutes are aligned with epoch, that is you will get windows such as
          +`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you want to change that
          +you can give an offset. With an offset of 15 minutes you would, for example, get
          +`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc.
          +An important use case for offsets is to adjust windows to timezones other than UTC-0.
          +For example, in China you would have to specify an offset of `Time.hours(-8)`.
          +
          +### Session Windows
          +
          +The session windows assigner groups elements by sessions of activity. Session windows do not overlap and
          +do not have a fixed start and end time in contrast to tumbling windows and sliding windows. Instead a
          +session window assigner closes a window when it does not receive elements for a certain period
          +of time, i.e., when a gap of inactivity occurred. A session window assigner is configured with the session gap which
          +defines how long the assigner waits until it closes the current session window and assigns following elements
          +to a new session window.
          +
          +<img src="{{ site.baseurl }}/fig/session-windows.svg" class="center" style="width: 80%;" />
          +
          +The following code snippets show how to use session windows.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +{% highlight java %}
          +DataStream<T> input = ...;
          +
          +// event-time session windows
          input
          .keyBy(<key selector>)
          - .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
          - .<windowed transformation>(<window function>)
          + .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
          + .<windowed transformation>(<window function>);

          // processing-time session windows
          input
          .keyBy(<key selector>)
          .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
          + .<windowed transformation>(<window function>);
          +{% endhighlight %}
          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}

          +val input: DataStream[T] = ...
          +
          +// event-time session windows
          +input
          + .keyBy(<key selector>)
          + .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
          .<windowed transformation>(<window function>)

          -// global windows
          +// processing-time session windows
          input
          .keyBy(<key selector>)

          • .window(GlobalWindows.create())
            + .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
            + .<windowed transformation>(<window function>) {% endhighlight %}

            </div>
            </div>

          -Note, how we can specify a time interval by using one of `Time.milliseconds`, `Time.seconds`,
          +Time intervals can be specified by using one of `Time.milliseconds`, `Time.seconds`,
          `Time.minutes`, and so on.

          -The time-based window assigners also take an optional `offset` parameter that can be used to
          -change the alignment of windows. For example, without offsets hourly windows are aligned
          -with epoch, that is you will get windows such as `1:00 - 1:59`, `2:00 - 2:59` and so on. If you
          -want to change that you can give an offset. With an offset of 15 minutes you would, for example,
          -get `1:15 - 2:14`, `2:15 - 3:14` etc. Another important use case for offsets is when you
          -want to have daily windows and live in a timezone other than UTC-0. For example, in China
          -you would have to specify an offset of `Time.hours(-8)`.
          +<span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end,
          +they are evaluated differently than tumbling and sliding windows. Internally, a session window operator
          +creates a new window for each arriving record and merges windows together if their are closer to each other
          +than the defined gap.
          +In order to be mergable, a session window operator requires a mergable [Trigger](#triggers) and a mergable
          — End diff –

          I think it should be `mergeable` instead of `mergable`, appears several times in the text.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97823809 — Diff: docs/dev/windows.md — @@ -204,72 +221,120 @@ input {% highlight scala %} val input: DataStream [T] = ... -// tumbling event-time windows -input - .keyBy(<key selector>) - .window(TumblingEventTimeWindows.of(Time.seconds(5))) - .<windowed transformation>(<window function>) - // sliding event-time windows input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) -// event-time session windows +// sliding processing-time windows input .keyBy(<key selector>) - .window(EventTimeSessionWindows.withGap(Time.minutes(10))) + .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) -// tumbling processing-time windows +// sliding processing-time windows offset by -8 hours input .keyBy(<key selector>) - .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>) +{% endhighlight %} +</div> +</div> -// sliding processing-time windows +Time intervals can be specified by using one of `Time.milliseconds `, `Time.seconds `, +`Time.minutes `, and so on. + +As shown in the last example, sliding window assigners also take an optional `offset` parameter +that can be used to change the alignment of windows. For example, without offsets hourly windows +sliding by 30 minutes are aligned with epoch, that is you will get windows such as +`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you want to change that +you can give an offset. With an offset of 15 minutes you would, for example, get +`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc. +An important use case for offsets is to adjust windows to timezones other than UTC-0. +For example, in China you would have to specify an offset of `Time.hours(-8)`. + +### Session Windows + +The session windows assigner groups elements by sessions of activity. Session windows do not overlap and +do not have a fixed start and end time in contrast to tumbling windows and sliding windows . Instead a +session window assigner closes a window when it does not receive elements for a certain period +of time, i.e., when a gap of inactivity occurred. A session window assigner is configured with the session gap which +defines how long the assigner waits until it closes the current session window and assigns following elements +to a new session window. + +<img src="{{ site.baseurl }}/fig/session-windows.svg" class="center" style="width: 80%;" /> + +The following code snippets show how to use session windows. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<T> input = ...; + +// event-time session windows input .keyBy(<key selector>) - .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) - .<windowed transformation>(<window function>) + .window(EventTimeSessionWindows.withGap(Time.minutes(10))) + .<windowed transformation>(<window function>); // processing-time session windows input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) + .<windowed transformation>(<window function>); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream [T] = ... + +// event-time session windows +input + .keyBy(<key selector>) + .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>) -// global windows +// processing-time session windows input .keyBy(<key selector>) .window(GlobalWindows.create()) + .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) + .<windowed transformation>(<window function>) {% endhighlight %} </div> </div> -Note, how we can specify a time interval by using one of `Time.milliseconds `, `Time.seconds `, +Time intervals can be specified by using one of `Time.milliseconds `, `Time.seconds `, `Time.minutes `, and so on. -The time-based window assigners also take an optional `offset` parameter that can be used to -change the alignment of windows. For example, without offsets hourly windows are aligned -with epoch, that is you will get windows such as `1:00 - 1:59`, `2:00 - 2:59` and so on. If you -want to change that you can give an offset. With an offset of 15 minutes you would, for example, -get `1:15 - 2:14`, `2:15 - 3:14` etc. Another important use case for offsets is when you -want to have daily windows and live in a timezone other than UTC-0. For example, in China -you would have to specify an offset of `Time.hours(-8)`. +<span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end, +they are evaluated differently than tumbling and sliding windows. Internally, a session window operator +creates a new window for each arriving record and merges windows together if their are closer to each other +than the defined gap. +In order to be mergable, a session window operator requires a mergable [Trigger] (#triggers) and a mergable — End diff – I think it should be `mergeable` instead of `mergable`, appears several times in the text.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97827894

          — Diff: docs/dev/windows.md —
          @@ -622,133 +690,138 @@ input
          </div>
          </div>

              1. Dealing with Late Data
                +## Triggers

          -When working with event-time windowing it can happen that elements arrive late, i.e the
          -watermark that Flink uses to keep track of the progress of event-time is already past the
          -end timestamp of a window to which an element belongs. Please
          -see [event time](./event_time.html) and especially
          -[late elements](./event_time.html#late-elements) for a more thorough discussion of
          -how Flink deals with event time.
          +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -You can specify how a windowed transformation should deal with late elements and how much lateness
          -is allowed. The parameter for this is called allowed lateness. This specifies by how much time
          -elements can be late. Elements that arrive within the allowed lateness are still put into windows
          -and are considered when computing window results. If elements arrive after the allowed lateness they
          -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage
          -collected once the watermark passes the end of a window plus the allowed lateness.
          +The trigger interface provides five methods that react to different events:
          — End diff –

          I think it doesn't "provide" the methods, maybe write "The trigger interface has five methods that allow a Trigger to react to different events"

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97827894 — Diff: docs/dev/windows.md — @@ -622,133 +690,138 @@ input </div> </div> Dealing with Late Data +## Triggers -When working with event-time windowing it can happen that elements arrive late, i.e the -watermark that Flink uses to keep track of the progress of event-time is already past the -end timestamp of a window to which an element belongs. Please -see [event time] (./event_time.html) and especially - [late elements] (./event_time.html#late-elements) for a more thorough discussion of -how Flink deals with event time. +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -You can specify how a windowed transformation should deal with late elements and how much lateness -is allowed. The parameter for this is called allowed lateness . This specifies by how much time -elements can be late. Elements that arrive within the allowed lateness are still put into windows -and are considered when computing window results. If elements arrive after the allowed lateness they -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage -collected once the watermark passes the end of a window plus the allowed lateness. +The trigger interface provides five methods that react to different events: — End diff – I think it doesn't "provide" the methods, maybe write "The trigger interface has five methods that allow a Trigger to react to different events"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97833002

          — Diff: docs/dev/windows.md —
          @@ -758,8 +831,33 @@ input
          val input: DataStream[T] = ...

          input

          • .windowAll(<window assigner>)
            + .keyBy(<key selector>)
            + .window(<window assigner>)
            + .allowedLateness(<time>)
            .<windowed transformation>(<window function>) {% endhighlight %}

            </div>
            </div>
            +
            +<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
            +data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
            +
            +### Late elements considerations
            +
            +When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes
            +the end of the window. In these cases, when a late but not dropped element arrives, it will trigger another firing for the

              • End diff –

          Not "it will trigger" but "it could trigger" (all depends on the Trigger 😉 )

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97833002 — Diff: docs/dev/windows.md — @@ -758,8 +831,33 @@ input val input: DataStream [T] = ... input .windowAll(<window assigner>) + .keyBy(<key selector>) + .window(<window assigner>) + .allowedLateness(<time>) .<windowed transformation>(<window function>) {% endhighlight %} </div> </div> + +<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no +data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`. + +### Late elements considerations + +When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes +the end of the window. In these cases, when a late but not dropped element arrives, it will trigger another firing for the End diff – Not "it will trigger" but "it could trigger" (all depends on the Trigger 😉 )
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97829270

          — Diff: docs/dev/windows.md —
          @@ -622,133 +690,138 @@ input
          </div>
          </div>

              1. Dealing with Late Data
                +## Triggers

          -When working with event-time windowing it can happen that elements arrive late, i.e the
          -watermark that Flink uses to keep track of the progress of event-time is already past the
          -end timestamp of a window to which an element belongs. Please
          -see [event time](./event_time.html) and especially
          -[late elements](./event_time.html#late-elements) for a more thorough discussion of
          -how Flink deals with event time.
          +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -You can specify how a windowed transformation should deal with late elements and how much lateness
          -is allowed. The parameter for this is called allowed lateness. This specifies by how much time
          -elements can be late. Elements that arrive within the allowed lateness are still put into windows
          -and are considered when computing window results. If elements arrive after the allowed lateness they
          -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage
          -collected once the watermark passes the end of a window plus the allowed lateness.
          +The trigger interface provides five methods that react to different events:

          -<span class="label label-info">Default</span> By default, the allowed lateness is set to
          -`0`. That is, elements that arrive behind the watermark will be dropped.
          +* The `onElement()` method is called for each element that is added to a window.
          +* The `onEventTime()` method is called when a registered event-time timer fires.
          +* The `onProcessingTime()` method is called when a registered processing-time timer fires.
          +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
          +* Finally the `clear()` method performs any action needed upon removal of the corresponding window.

          -You can specify an allowed lateness like this:
          +Any of these methods can be used to register processing- or event-time timers for future actions.

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -

          {% highlight java %}
          -DataStream<T> input = ...;
          +### Fire and Purge

          -input
          - .keyBy(<key selector>)
          - .window(<window assigner>)
          - .allowedLateness(<time>)
          - .<windowed transformation>(<window function>);
          -{% endhighlight %}
          -</div>
          +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction`
          +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor).
          +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result.

          -<div data-lang="scala" markdown="1">
          -{% highlight scala %}
          -val input: DataStream[T] = ...
          +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
          +By default, the pre-implemented triggers simply `FIRE` without purging the window state.

          -input
          - .keyBy(<key selector>)
          - .window(<window assigner>)
          - .allowedLateness(<time>)
          - .<windowed transformation>(<window function>)
          -{% endhighlight %}
          -</div>
          -</div>
          +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements.

          -<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
          -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
          +### Default Triggers of WindowAssigners

          -## Triggers
          +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as
          +default trigger. This trigger simply fires once the watermark passes the end of a window.

          -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being
          -processed by the window function. The trigger observes how elements are added to windows
          -and can also keep track of the progress of processing time and event time. Once a trigger
          -determines that a window is ready for processing, it fires. This is the signal for the
          -window operation to take the elements that are currently in the window and pass them along to
          -the window function to produce output for the firing window.
          +<span class="label label-danger">Attention</span> The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`.

          -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be
          -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as
          -default trigger. This trigger simply fires once the watermark passes the end of a window.
          +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you
          +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a
          +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the
          +progress of time but only by count. Right now, you have to write your own custom trigger if
          +you want to react based on both time and count.

          -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The
          -whole specification of the windowed transformation would then look like this:
          +### Built-in and Custom Triggers

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -{% highlight java %}

          -DataStream<T> input = ...;
          +Flink comes with a few built-in triggers.

          -input

          • .keyBy(<key selector>)
          • .window(<window assigner>)
          • .trigger(<trigger>)
          • .<windowed transformation>(<window function>);
            - {% endhighlight %}

            -</div>
            +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks.
            +* The `ProcessingTimeTrigger` fires based on processing time.
            +* The `CountTrigger` which fires once the number of elements in a window exceeds the given limit.
            +* The `PurgingTrigger` takes as argument another trigger and transforms it into a purging one.

          -<div data-lang="scala" markdown="1">
          -

          {% highlight scala %}

          -val input: DataStream[T] = ...
          +If you need to implement a custom trigger, you should check out the abstract

          {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger" %}

          class. Please note that the API is still evolving and might change in future versions of Flink.
          — End diff –

          Maybe we should link to Javadoc here instead.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97829270 — Diff: docs/dev/windows.md — @@ -622,133 +690,138 @@ input </div> </div> Dealing with Late Data +## Triggers -When working with event-time windowing it can happen that elements arrive late, i.e the -watermark that Flink uses to keep track of the progress of event-time is already past the -end timestamp of a window to which an element belongs. Please -see [event time] (./event_time.html) and especially - [late elements] (./event_time.html#late-elements) for a more thorough discussion of -how Flink deals with event time. +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -You can specify how a windowed transformation should deal with late elements and how much lateness -is allowed. The parameter for this is called allowed lateness . This specifies by how much time -elements can be late. Elements that arrive within the allowed lateness are still put into windows -and are considered when computing window results. If elements arrive after the allowed lateness they -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage -collected once the watermark passes the end of a window plus the allowed lateness. +The trigger interface provides five methods that react to different events: -<span class="label label-info">Default</span> By default, the allowed lateness is set to -`0`. That is, elements that arrive behind the watermark will be dropped. +* The `onElement()` method is called for each element that is added to a window. +* The `onEventTime()` method is called when a registered event-time timer fires. +* The `onProcessingTime()` method is called when a registered processing-time timer fires. +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. +* Finally the `clear()` method performs any action needed upon removal of the corresponding window. -You can specify an allowed lateness like this: +Any of these methods can be used to register processing- or event-time timers for future actions. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> - {% highlight java %} -DataStream<T> input = ...; +### Fire and Purge -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>); -{% endhighlight %} -</div> +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction` +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor). +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream [T] = ... +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content. +By default, the pre-implemented triggers simply `FIRE` without purging the window state. -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements. -<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`. +### Default Triggers of WindowAssigners -## Triggers +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as +default trigger. This trigger simply fires once the watermark passes the end of a window. -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being -processed by the window function . The trigger observes how elements are added to windows -and can also keep track of the progress of processing time and event time. Once a trigger -determines that a window is ready for processing, it fires. This is the signal for the -window operation to take the elements that are currently in the window and pass them along to -the window function to produce output for the firing window. +<span class="label label-danger">Attention</span> The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`. -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as -default trigger. This trigger simply fires once the watermark passes the end of a window. +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the +progress of time but only by count. Right now, you have to write your own custom trigger if +you want to react based on both time and count. -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The -whole specification of the windowed transformation would then look like this: +### Built-in and Custom Triggers -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<T> input = ...; +Flink comes with a few built-in triggers. -input .keyBy(<key selector>) .window(<window assigner>) .trigger(<trigger>) .<windowed transformation>(<window function>); - {% endhighlight %} -</div> +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks. +* The `ProcessingTimeTrigger` fires based on processing time. +* The `CountTrigger` which fires once the number of elements in a window exceeds the given limit. +* The `PurgingTrigger` takes as argument another trigger and transforms it into a purging one. -<div data-lang="scala" markdown="1"> - {% highlight scala %} -val input: DataStream [T] = ... +If you need to implement a custom trigger, you should check out the abstract {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger" %} class. Please note that the API is still evolving and might change in future versions of Flink. — End diff – Maybe we should link to Javadoc here instead.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97827094

          — Diff: docs/dev/windows.md —
          @@ -204,72 +221,120 @@ input

          {% highlight scala %}
          val input: DataStream[T] = ...

          -// tumbling event-time windows
          -input
          - .keyBy(<key selector>)
          - .window(TumblingEventTimeWindows.of(Time.seconds(5)))
          - .<windowed transformation>(<window function>)
          -
          // sliding event-time windows
          input
          .keyBy(<key selector>)
          .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
          .<windowed transformation>(<window function>)

          -// event-time session windows
          +// sliding processing-time windows
          input
          .keyBy(<key selector>)
          - .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
          + .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
          .<windowed transformation>(<window function>)

          -// tumbling processing-time windows
          +// sliding processing-time windows offset by -8 hours
          input
          .keyBy(<key selector>)
          - .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
          + .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
          .<windowed transformation>(<window function>)
          +{% endhighlight %}
          +</div>
          +</div>

          -// sliding processing-time windows
          +Time intervals can be specified by using one of `Time.milliseconds`, `Time.seconds`,
          +`Time.minutes`, and so on.
          +
          +As shown in the last example, sliding window assigners also take an optional `offset` parameter
          +that can be used to change the alignment of windows. For example, without offsets hourly windows
          +sliding by 30 minutes are aligned with epoch, that is you will get windows such as
          +`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you want to change that
          +you can give an offset. With an offset of 15 minutes you would, for example, get
          +`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc.
          +An important use case for offsets is to adjust windows to timezones other than UTC-0.
          +For example, in China you would have to specify an offset of `Time.hours(-8)`.
          +
          +### Session Windows
          +
          +The session windows assigner groups elements by sessions of activity. Session windows do not overlap and
          +do not have a fixed start and end time in contrast to tumbling windows and sliding windows. Instead a
          +session window assigner closes a window when it does not receive elements for a certain period
          +of time, i.e., when a gap of inactivity occurred. A session window assigner is configured with the session gap which
          +defines how long the assigner waits until it closes the current session window and assigns following elements
          +to a new session window.
          +
          +<img src="{{ site.baseurl }}/fig/session-windows.svg" class="center" style="width: 80%;" />
          +
          +The following code snippets show how to use session windows.
          +
          +<div class="codetabs" markdown="1">
          +<div data-lang="java" markdown="1">
          +{% highlight java %}
          +DataStream<T> input = ...;
          +
          +// event-time session windows
          input
          .keyBy(<key selector>)
          - .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
          - .<windowed transformation>(<window function>)
          + .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
          + .<windowed transformation>(<window function>);

          // processing-time session windows
          input
          .keyBy(<key selector>)
          .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
          + .<windowed transformation>(<window function>);
          +{% endhighlight %}
          +</div>
          +
          +<div data-lang="scala" markdown="1">
          +{% highlight scala %}

          +val input: DataStream[T] = ...
          +
          +// event-time session windows
          +input
          + .keyBy(<key selector>)
          + .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
          .<windowed transformation>(<window function>)

          -// global windows
          +// processing-time session windows
          input
          .keyBy(<key selector>)

          • .window(GlobalWindows.create())
            + .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
            + .<windowed transformation>(<window function>) {% endhighlight %}

            </div>
            </div>

          -Note, how we can specify a time interval by using one of `Time.milliseconds`, `Time.seconds`,
          +Time intervals can be specified by using one of `Time.milliseconds`, `Time.seconds`,
          `Time.minutes`, and so on.

          -The time-based window assigners also take an optional `offset` parameter that can be used to
          -change the alignment of windows. For example, without offsets hourly windows are aligned
          -with epoch, that is you will get windows such as `1:00 - 1:59`, `2:00 - 2:59` and so on. If you
          -want to change that you can give an offset. With an offset of 15 minutes you would, for example,
          -get `1:15 - 2:14`, `2:15 - 3:14` etc. Another important use case for offsets is when you
          -want to have daily windows and live in a timezone other than UTC-0. For example, in China
          -you would have to specify an offset of `Time.hours(-8)`.
          +<span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end,
          +they are evaluated differently than tumbling and sliding windows. Internally, a session window operator
          +creates a new window for each arriving record and merges windows together if their are closer to each other
          +than the defined gap.
          +In order to be mergable, a session window operator requires a mergable [Trigger](#triggers) and a mergable
          — End diff –

          I think it should be "merging Trigger" and also "merging WindowFunction" since they themselves are not mergeable but just "merge aware" or "merge compatible".

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97827094 — Diff: docs/dev/windows.md — @@ -204,72 +221,120 @@ input {% highlight scala %} val input: DataStream [T] = ... -// tumbling event-time windows -input - .keyBy(<key selector>) - .window(TumblingEventTimeWindows.of(Time.seconds(5))) - .<windowed transformation>(<window function>) - // sliding event-time windows input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) -// event-time session windows +// sliding processing-time windows input .keyBy(<key selector>) - .window(EventTimeSessionWindows.withGap(Time.minutes(10))) + .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) -// tumbling processing-time windows +// sliding processing-time windows offset by -8 hours input .keyBy(<key selector>) - .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>) +{% endhighlight %} +</div> +</div> -// sliding processing-time windows +Time intervals can be specified by using one of `Time.milliseconds `, `Time.seconds `, +`Time.minutes `, and so on. + +As shown in the last example, sliding window assigners also take an optional `offset` parameter +that can be used to change the alignment of windows. For example, without offsets hourly windows +sliding by 30 minutes are aligned with epoch, that is you will get windows such as +`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you want to change that +you can give an offset. With an offset of 15 minutes you would, for example, get +`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc. +An important use case for offsets is to adjust windows to timezones other than UTC-0. +For example, in China you would have to specify an offset of `Time.hours(-8)`. + +### Session Windows + +The session windows assigner groups elements by sessions of activity. Session windows do not overlap and +do not have a fixed start and end time in contrast to tumbling windows and sliding windows . Instead a +session window assigner closes a window when it does not receive elements for a certain period +of time, i.e., when a gap of inactivity occurred. A session window assigner is configured with the session gap which +defines how long the assigner waits until it closes the current session window and assigns following elements +to a new session window. + +<img src="{{ site.baseurl }}/fig/session-windows.svg" class="center" style="width: 80%;" /> + +The following code snippets show how to use session windows. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<T> input = ...; + +// event-time session windows input .keyBy(<key selector>) - .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) - .<windowed transformation>(<window function>) + .window(EventTimeSessionWindows.withGap(Time.minutes(10))) + .<windowed transformation>(<window function>); // processing-time session windows input .keyBy(<key selector>) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) + .<windowed transformation>(<window function>); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val input: DataStream [T] = ... + +// event-time session windows +input + .keyBy(<key selector>) + .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .<windowed transformation>(<window function>) -// global windows +// processing-time session windows input .keyBy(<key selector>) .window(GlobalWindows.create()) + .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) + .<windowed transformation>(<window function>) {% endhighlight %} </div> </div> -Note, how we can specify a time interval by using one of `Time.milliseconds `, `Time.seconds `, +Time intervals can be specified by using one of `Time.milliseconds `, `Time.seconds `, `Time.minutes `, and so on. -The time-based window assigners also take an optional `offset` parameter that can be used to -change the alignment of windows. For example, without offsets hourly windows are aligned -with epoch, that is you will get windows such as `1:00 - 1:59`, `2:00 - 2:59` and so on. If you -want to change that you can give an offset. With an offset of 15 minutes you would, for example, -get `1:15 - 2:14`, `2:15 - 3:14` etc. Another important use case for offsets is when you -want to have daily windows and live in a timezone other than UTC-0. For example, in China -you would have to specify an offset of `Time.hours(-8)`. +<span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end, +they are evaluated differently than tumbling and sliding windows. Internally, a session window operator +creates a new window for each arriving record and merges windows together if their are closer to each other +than the defined gap. +In order to be mergable, a session window operator requires a mergable [Trigger] (#triggers) and a mergable — End diff – I think it should be "merging Trigger" and also "merging WindowFunction" since they themselves are not mergeable but just "merge aware" or "merge compatible".
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97828763

          — Diff: docs/dev/windows.md —
          @@ -622,133 +690,138 @@ input
          </div>
          </div>

              1. Dealing with Late Data
                +## Triggers

          -When working with event-time windowing it can happen that elements arrive late, i.e the
          -watermark that Flink uses to keep track of the progress of event-time is already past the
          -end timestamp of a window to which an element belongs. Please
          -see [event time](./event_time.html) and especially
          -[late elements](./event_time.html#late-elements) for a more thorough discussion of
          -how Flink deals with event time.
          +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -You can specify how a windowed transformation should deal with late elements and how much lateness
          -is allowed. The parameter for this is called allowed lateness. This specifies by how much time
          -elements can be late. Elements that arrive within the allowed lateness are still put into windows
          -and are considered when computing window results. If elements arrive after the allowed lateness they
          -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage
          -collected once the watermark passes the end of a window plus the allowed lateness.
          +The trigger interface provides five methods that react to different events:

          -<span class="label label-info">Default</span> By default, the allowed lateness is set to
          -`0`. That is, elements that arrive behind the watermark will be dropped.
          +* The `onElement()` method is called for each element that is added to a window.
          +* The `onEventTime()` method is called when a registered event-time timer fires.
          +* The `onProcessingTime()` method is called when a registered processing-time timer fires.
          +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
          +* Finally the `clear()` method performs any action needed upon removal of the corresponding window.

          -You can specify an allowed lateness like this:
          +Any of these methods can be used to register processing- or event-time timers for future actions.

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -

          {% highlight java %}

          -DataStream<T> input = ...;
          +### Fire and Purge

          -input

          • .keyBy(<key selector>)
          • .window(<window assigner>)
          • .allowedLateness(<time>)
          • .<windowed transformation>(<window function>);
            - {% endhighlight %}
            -</div>
            +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction`
            +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor).
            +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result.

            -<div data-lang="scala" markdown="1">
            -{% highlight scala %}
            -val input: DataStream[T] = ...
            +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
            +By default, the pre-implemented triggers simply `FIRE` without purging the window state.

            -input
            - .keyBy(<key selector>)
            - .window(<window assigner>)
            - .allowedLateness(<time>)
            - .<windowed transformation>(<window function>)
            -{% endhighlight %}

            -</div>
            -</div>
            +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements.

              • End diff –

          This is a bit tricky because for non-merging windows there is nothing that could be removed except the elements. Maybe write that PURGING will simply remove the contents of the window and will leave any eventual meta information intact and will also leave the Trigger state intact.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97828763 — Diff: docs/dev/windows.md — @@ -622,133 +690,138 @@ input </div> </div> Dealing with Late Data +## Triggers -When working with event-time windowing it can happen that elements arrive late, i.e the -watermark that Flink uses to keep track of the progress of event-time is already past the -end timestamp of a window to which an element belongs. Please -see [event time] (./event_time.html) and especially - [late elements] (./event_time.html#late-elements) for a more thorough discussion of -how Flink deals with event time. +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -You can specify how a windowed transformation should deal with late elements and how much lateness -is allowed. The parameter for this is called allowed lateness . This specifies by how much time -elements can be late. Elements that arrive within the allowed lateness are still put into windows -and are considered when computing window results. If elements arrive after the allowed lateness they -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage -collected once the watermark passes the end of a window plus the allowed lateness. +The trigger interface provides five methods that react to different events: -<span class="label label-info">Default</span> By default, the allowed lateness is set to -`0`. That is, elements that arrive behind the watermark will be dropped. +* The `onElement()` method is called for each element that is added to a window. +* The `onEventTime()` method is called when a registered event-time timer fires. +* The `onProcessingTime()` method is called when a registered processing-time timer fires. +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. +* Finally the `clear()` method performs any action needed upon removal of the corresponding window. -You can specify an allowed lateness like this: +Any of these methods can be used to register processing- or event-time timers for future actions. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> - {% highlight java %} -DataStream<T> input = ...; +### Fire and Purge -input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .<windowed transformation>(<window function>); - {% endhighlight %} -</div> +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction` +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor). +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream [T] = ... +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content. +By default, the pre-implemented triggers simply `FIRE` without purging the window state. -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements. End diff – This is a bit tricky because for non-merging windows there is nothing that could be removed except the elements. Maybe write that PURGING will simply remove the contents of the window and will leave any eventual meta information intact and will also leave the Trigger state intact.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97828488

          — Diff: docs/dev/windows.md —
          @@ -622,133 +690,138 @@ input
          </div>
          </div>

              1. Dealing with Late Data
                +## Triggers

          -When working with event-time windowing it can happen that elements arrive late, i.e the
          -watermark that Flink uses to keep track of the progress of event-time is already past the
          -end timestamp of a window to which an element belongs. Please
          -see [event time](./event_time.html) and especially
          -[late elements](./event_time.html#late-elements) for a more thorough discussion of
          -how Flink deals with event time.
          +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -You can specify how a windowed transformation should deal with late elements and how much lateness
          -is allowed. The parameter for this is called allowed lateness. This specifies by how much time
          -elements can be late. Elements that arrive within the allowed lateness are still put into windows
          -and are considered when computing window results. If elements arrive after the allowed lateness they
          -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage
          -collected once the watermark passes the end of a window plus the allowed lateness.
          +The trigger interface provides five methods that react to different events:

          -<span class="label label-info">Default</span> By default, the allowed lateness is set to
          -`0`. That is, elements that arrive behind the watermark will be dropped.
          +* The `onElement()` method is called for each element that is added to a window.
          +* The `onEventTime()` method is called when a registered event-time timer fires.
          +* The `onProcessingTime()` method is called when a registered processing-time timer fires.
          +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
          +* Finally the `clear()` method performs any action needed upon removal of the corresponding window.

          -You can specify an allowed lateness like this:
          +Any of these methods can be used to register processing- or event-time timers for future actions.

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -

          {% highlight java %}

          -DataStream<T> input = ...;
          +### Fire and Purge

          -input

          • .keyBy(<key selector>)
          • .window(<window assigner>)
          • .allowedLateness(<time>)
          • .<windowed transformation>(<window function>);
            - {% endhighlight %}

            -</div>
            +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction`

              • End diff –

          Maybe we should mention earlier which trigger methods can return a `TriggerResult` and quickly mention what that means. "A trigger fires when it returns FIRE or FIRE_AND_PURGE."

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97828488 — Diff: docs/dev/windows.md — @@ -622,133 +690,138 @@ input </div> </div> Dealing with Late Data +## Triggers -When working with event-time windowing it can happen that elements arrive late, i.e the -watermark that Flink uses to keep track of the progress of event-time is already past the -end timestamp of a window to which an element belongs. Please -see [event time] (./event_time.html) and especially - [late elements] (./event_time.html#late-elements) for a more thorough discussion of -how Flink deals with event time. +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -You can specify how a windowed transformation should deal with late elements and how much lateness -is allowed. The parameter for this is called allowed lateness . This specifies by how much time -elements can be late. Elements that arrive within the allowed lateness are still put into windows -and are considered when computing window results. If elements arrive after the allowed lateness they -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage -collected once the watermark passes the end of a window plus the allowed lateness. +The trigger interface provides five methods that react to different events: -<span class="label label-info">Default</span> By default, the allowed lateness is set to -`0`. That is, elements that arrive behind the watermark will be dropped. +* The `onElement()` method is called for each element that is added to a window. +* The `onEventTime()` method is called when a registered event-time timer fires. +* The `onProcessingTime()` method is called when a registered processing-time timer fires. +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. +* Finally the `clear()` method performs any action needed upon removal of the corresponding window. -You can specify an allowed lateness like this: +Any of these methods can be used to register processing- or event-time timers for future actions. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> - {% highlight java %} -DataStream<T> input = ...; +### Fire and Purge -input .keyBy(<key selector>) .window(<window assigner>) .allowedLateness(<time>) .<windowed transformation>(<window function>); - {% endhighlight %} -</div> +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction` End diff – Maybe we should mention earlier which trigger methods can return a `TriggerResult` and quickly mention what that means. "A trigger fires when it returns FIRE or FIRE_AND_PURGE."
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97828958

          — Diff: docs/dev/windows.md —
          @@ -622,133 +690,138 @@ input
          </div>
          </div>

              1. Dealing with Late Data
                +## Triggers

          -When working with event-time windowing it can happen that elements arrive late, i.e the
          -watermark that Flink uses to keep track of the progress of event-time is already past the
          -end timestamp of a window to which an element belongs. Please
          -see [event time](./event_time.html) and especially
          -[late elements](./event_time.html#late-elements) for a more thorough discussion of
          -how Flink deals with event time.
          +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -You can specify how a windowed transformation should deal with late elements and how much lateness
          -is allowed. The parameter for this is called allowed lateness. This specifies by how much time
          -elements can be late. Elements that arrive within the allowed lateness are still put into windows
          -and are considered when computing window results. If elements arrive after the allowed lateness they
          -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage
          -collected once the watermark passes the end of a window plus the allowed lateness.
          +The trigger interface provides five methods that react to different events:

          -<span class="label label-info">Default</span> By default, the allowed lateness is set to
          -`0`. That is, elements that arrive behind the watermark will be dropped.
          +* The `onElement()` method is called for each element that is added to a window.
          +* The `onEventTime()` method is called when a registered event-time timer fires.
          +* The `onProcessingTime()` method is called when a registered processing-time timer fires.
          +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
          +* Finally the `clear()` method performs any action needed upon removal of the corresponding window.

          -You can specify an allowed lateness like this:
          +Any of these methods can be used to register processing- or event-time timers for future actions.

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -

          {% highlight java %}
          -DataStream<T> input = ...;
          +### Fire and Purge

          -input
          - .keyBy(<key selector>)
          - .window(<window assigner>)
          - .allowedLateness(<time>)
          - .<windowed transformation>(<window function>);
          -{% endhighlight %}
          -</div>
          +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction`
          +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor).
          +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result.

          -<div data-lang="scala" markdown="1">
          -{% highlight scala %}
          -val input: DataStream[T] = ...
          +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
          +By default, the pre-implemented triggers simply `FIRE` without purging the window state.

          -input
          - .keyBy(<key selector>)
          - .window(<window assigner>)
          - .allowedLateness(<time>)
          - .<windowed transformation>(<window function>)
          -{% endhighlight %}
          -</div>
          -</div>
          +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements.

          -<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
          -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
          +### Default Triggers of WindowAssigners

          -## Triggers
          +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as
          +default trigger. This trigger simply fires once the watermark passes the end of a window.

          -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being
          -processed by the window function. The trigger observes how elements are added to windows
          -and can also keep track of the progress of processing time and event time. Once a trigger
          -determines that a window is ready for processing, it fires. This is the signal for the
          -window operation to take the elements that are currently in the window and pass them along to
          -the window function to produce output for the firing window.
          +<span class="label label-danger">Attention</span> The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`.

          -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be
          -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as
          -default trigger. This trigger simply fires once the watermark passes the end of a window.
          +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you
          +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a
          +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the
          +progress of time but only by count. Right now, you have to write your own custom trigger if
          +you want to react based on both time and count.

          -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The
          -whole specification of the windowed transformation would then look like this:
          +### Built-in and Custom Triggers

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -{% highlight java %}

          -DataStream<T> input = ...;
          +Flink comes with a few built-in triggers.

          -input

          • .keyBy(<key selector>)
          • .window(<window assigner>)
          • .trigger(<trigger>)
          • .<windowed transformation>(<window function>);
            - {% endhighlight %}

            -</div>
            +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks.
            +* The `ProcessingTimeTrigger` fires based on processing time.
            +* The `CountTrigger` which fires once the number of elements in a window exceeds the given limit.

              • End diff –

          None of the other Triggers have a "which" here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97828958 — Diff: docs/dev/windows.md — @@ -622,133 +690,138 @@ input </div> </div> Dealing with Late Data +## Triggers -When working with event-time windowing it can happen that elements arrive late, i.e the -watermark that Flink uses to keep track of the progress of event-time is already past the -end timestamp of a window to which an element belongs. Please -see [event time] (./event_time.html) and especially - [late elements] (./event_time.html#late-elements) for a more thorough discussion of -how Flink deals with event time. +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -You can specify how a windowed transformation should deal with late elements and how much lateness -is allowed. The parameter for this is called allowed lateness . This specifies by how much time -elements can be late. Elements that arrive within the allowed lateness are still put into windows -and are considered when computing window results. If elements arrive after the allowed lateness they -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage -collected once the watermark passes the end of a window plus the allowed lateness. +The trigger interface provides five methods that react to different events: -<span class="label label-info">Default</span> By default, the allowed lateness is set to -`0`. That is, elements that arrive behind the watermark will be dropped. +* The `onElement()` method is called for each element that is added to a window. +* The `onEventTime()` method is called when a registered event-time timer fires. +* The `onProcessingTime()` method is called when a registered processing-time timer fires. +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. +* Finally the `clear()` method performs any action needed upon removal of the corresponding window. -You can specify an allowed lateness like this: +Any of these methods can be used to register processing- or event-time timers for future actions. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> - {% highlight java %} -DataStream<T> input = ...; +### Fire and Purge -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>); -{% endhighlight %} -</div> +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction` +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor). +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream [T] = ... +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content. +By default, the pre-implemented triggers simply `FIRE` without purging the window state. -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements. -<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`. +### Default Triggers of WindowAssigners -## Triggers +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as +default trigger. This trigger simply fires once the watermark passes the end of a window. -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being -processed by the window function . The trigger observes how elements are added to windows -and can also keep track of the progress of processing time and event time. Once a trigger -determines that a window is ready for processing, it fires. This is the signal for the -window operation to take the elements that are currently in the window and pass them along to -the window function to produce output for the firing window. +<span class="label label-danger">Attention</span> The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`. -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as -default trigger. This trigger simply fires once the watermark passes the end of a window. +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the +progress of time but only by count. Right now, you have to write your own custom trigger if +you want to react based on both time and count. -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The -whole specification of the windowed transformation would then look like this: +### Built-in and Custom Triggers -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<T> input = ...; +Flink comes with a few built-in triggers. -input .keyBy(<key selector>) .window(<window assigner>) .trigger(<trigger>) .<windowed transformation>(<window function>); - {% endhighlight %} -</div> +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks. +* The `ProcessingTimeTrigger` fires based on processing time. +* The `CountTrigger` which fires once the number of elements in a window exceeds the given limit. End diff – None of the other Triggers have a "which" here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97832573

          — Diff: docs/dev/windows.md —
          @@ -622,133 +690,138 @@ input
          </div>
          </div>

              1. Dealing with Late Data
                +## Triggers

          -When working with event-time windowing it can happen that elements arrive late, i.e the
          -watermark that Flink uses to keep track of the progress of event-time is already past the
          -end timestamp of a window to which an element belongs. Please
          -see [event time](./event_time.html) and especially
          -[late elements](./event_time.html#late-elements) for a more thorough discussion of
          -how Flink deals with event time.
          +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -You can specify how a windowed transformation should deal with late elements and how much lateness
          -is allowed. The parameter for this is called allowed lateness. This specifies by how much time
          -elements can be late. Elements that arrive within the allowed lateness are still put into windows
          -and are considered when computing window results. If elements arrive after the allowed lateness they
          -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage
          -collected once the watermark passes the end of a window plus the allowed lateness.
          +The trigger interface provides five methods that react to different events:

          -<span class="label label-info">Default</span> By default, the allowed lateness is set to
          -`0`. That is, elements that arrive behind the watermark will be dropped.
          +* The `onElement()` method is called for each element that is added to a window.
          +* The `onEventTime()` method is called when a registered event-time timer fires.
          +* The `onProcessingTime()` method is called when a registered processing-time timer fires.
          +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
          +* Finally the `clear()` method performs any action needed upon removal of the corresponding window.

          -You can specify an allowed lateness like this:
          +Any of these methods can be used to register processing- or event-time timers for future actions.

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -

          {% highlight java %}
          -DataStream<T> input = ...;
          +### Fire and Purge

          -input
          - .keyBy(<key selector>)
          - .window(<window assigner>)
          - .allowedLateness(<time>)
          - .<windowed transformation>(<window function>);
          -{% endhighlight %}
          -</div>
          +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction`
          +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor).
          +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result.

          -<div data-lang="scala" markdown="1">
          -{% highlight scala %}
          -val input: DataStream[T] = ...
          +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
          +By default, the pre-implemented triggers simply `FIRE` without purging the window state.

          -input
          - .keyBy(<key selector>)
          - .window(<window assigner>)
          - .allowedLateness(<time>)
          - .<windowed transformation>(<window function>)
          -{% endhighlight %}
          -</div>
          -</div>
          +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements.

          -<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
          -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
          +### Default Triggers of WindowAssigners

          -## Triggers
          +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as
          +default trigger. This trigger simply fires once the watermark passes the end of a window.

          -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being
          -processed by the window function. The trigger observes how elements are added to windows
          -and can also keep track of the progress of processing time and event time. Once a trigger
          -determines that a window is ready for processing, it fires. This is the signal for the
          -window operation to take the elements that are currently in the window and pass them along to
          -the window function to produce output for the firing window.
          +<span class="label label-danger">Attention</span> The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`.

          -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be
          -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as
          -default trigger. This trigger simply fires once the watermark passes the end of a window.
          +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you
          +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a
          +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the
          +progress of time but only by count. Right now, you have to write your own custom trigger if
          +you want to react based on both time and count.

          -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The
          -whole specification of the windowed transformation would then look like this:
          +### Built-in and Custom Triggers

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -{% highlight java %}

          -DataStream<T> input = ...;
          +Flink comes with a few built-in triggers.

          -input

          • .keyBy(<key selector>)
          • .window(<window assigner>)
          • .trigger(<trigger>)
          • .<windowed transformation>(<window function>);
            - {% endhighlight %}
            -</div>
            +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks.
            +* The `ProcessingTimeTrigger` fires based on processing time.
            +* The `CountTrigger` which fires once the number of elements in a window exceeds the given limit.
            +* The `PurgingTrigger` takes as argument another trigger and transforms it into a purging one.

            -<div data-lang="scala" markdown="1">
            -{% highlight scala %}
            -val input: DataStream[T] = ...
            +If you need to implement a custom trigger, you should check out the abstract {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger" %} class. Please note that the API is still evolving and might change in future versions of Flink.

            -input
            - .keyBy(<key selector>)
            - .window(<window assigner>)
            - .trigger(<trigger>)
            - .<windowed transformation>(<window function>)
            -{% endhighlight %}

            -</div>
            -</div>

          -Flink comes with a few triggers out-of-box: there is the already mentioned `EventTimeTrigger` that
          -fires based on the progress of event-time as measured by the watermark, the `ProcessingTimeTrigger`
          -does the same but based on processing time and the `CountTrigger` fires once the number of elements
          -in a window exceeds the given limit.
          +## Evictors

          -<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you
          -are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a
          -`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the
          -progress of time but only by count. Right now, you have to write your own custom trigger if
          -you want to react based on both time and count.
          +Flink’s windowing model allows specifying an optional `Evictor` in addition to the `WindowAssigner` and the `Trigger`.
          +This can be done using the `evictor(...)` method (shown in the beginning of this document). The evictor has the ability
          +to remove elements from a window after the trigger fires and before and/or after the window function is applied.
          +To do so, the `Evictor` interface has two methods:
          +
          + /**
          + * Optionally evicts elements. Called before windowing function.
          + *
          + * @param elements The elements currently in the pane.
          + * @param size The current number of elements in the pane.
          + * @param window The

          {@link Window}
          + * @param evictorContext The context for the Evictor
          + */
          + void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
          +
          + /**
          + * Optionally evicts elements. Called after windowing function.
          + *
          + * @param elements The elements currently in the pane.
          + * @param size The current number of elements in the pane.
          + * @param window The {@link Window}

          + * @param evictorContext The context for the Evictor
          + */
          + void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
          +
          +The `evictBefore()` contains the eviction logic to be applied before the window function, while the `evictAfter()`
          +contains the one to be applied after the window function. Elements evicted before the application of the window
          +function will not be processed by it.
          +
          +Flink comes with three pre-implemented evictors. These are:
          +
          +* `CountEvictor`: keeps up to a user-specified number of elements from the window and discards the remaining ones from
          +the beginning of the window buffer.
          +* `DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the delta between the last element in the
          +window buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold.
          +* `TimeEvictor`: takes as argument an `interval` in milliseconds and for a given window, it finds the maximum
          +timestamp `max_ts` among its elements and removes all the elements with timestamps smaller than `max_ts - interval`.

          -The internal `Trigger` API is still considered experimental but you can check out the code
          -if you want to write your own custom trigger:
          -

          {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger.java" %}

          .
          +<span class="label label-info">Default</span> By default, all the pre-implemented evictors apply their logic before the
          +window function.

              1. Non-keyed Windowing
                +<span class="label label-danger">Attention</span> Specifying an evictor prevents any pre-aggregation, as all the
                +elements of a window have to be passed to the evictor before applying the computation.

          -You can also leave out the `keyBy()` when specifying a windowed transformation. This means, however,
          -that Flink cannot process windows for different keys in parallel, essentially turning the
          -transformation into a non-parallel operation.
          +<span class="label label-danger">Attention</span> Flink provides no guarantees about the order of the elements within
          +a window. This implies that although an evictor may remove elements from the beginning of the window, these are not
          +necessarily the ones that arrive first or last.

          -<span class="label label-danger">Warning</span> As mentioned in the introduction, non-keyed
          -windows have the disadvantage that work cannot be distributed in the cluster because
          -windows cannot be computed independently per key. This can have severe performance implications.

          +## Allowed Lateness

          -The basic structure of a non-keyed windowed transformation is as follows:
          +When working with event-time windowing it can happen that elements arrive late, i.e. the watermark that Flink uses to
          +keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See
          +[event time](./event_time.html) and especially [late elements](./event_time.html#late-elements) for a more thorough
          +discussion of how Flink deals with event time.
          +
          +By default, late elements are dropped if their associated window was already evaluated. However,
          +Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness
          +specifies by how much time elements can be late before they are dropped. Elements that arrive
          +within the allowed lateness of a window are still added to the window and trigger an immediate evaluation of the window which might emit elements.
          — End diff –

          "the trigger an immediate evaluation" depends on the `Trigger`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97832573 — Diff: docs/dev/windows.md — @@ -622,133 +690,138 @@ input </div> </div> Dealing with Late Data +## Triggers -When working with event-time windowing it can happen that elements arrive late, i.e the -watermark that Flink uses to keep track of the progress of event-time is already past the -end timestamp of a window to which an element belongs. Please -see [event time] (./event_time.html) and especially - [late elements] (./event_time.html#late-elements) for a more thorough discussion of -how Flink deals with event time. +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -You can specify how a windowed transformation should deal with late elements and how much lateness -is allowed. The parameter for this is called allowed lateness . This specifies by how much time -elements can be late. Elements that arrive within the allowed lateness are still put into windows -and are considered when computing window results. If elements arrive after the allowed lateness they -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage -collected once the watermark passes the end of a window plus the allowed lateness. +The trigger interface provides five methods that react to different events: -<span class="label label-info">Default</span> By default, the allowed lateness is set to -`0`. That is, elements that arrive behind the watermark will be dropped. +* The `onElement()` method is called for each element that is added to a window. +* The `onEventTime()` method is called when a registered event-time timer fires. +* The `onProcessingTime()` method is called when a registered processing-time timer fires. +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. +* Finally the `clear()` method performs any action needed upon removal of the corresponding window. -You can specify an allowed lateness like this: +Any of these methods can be used to register processing- or event-time timers for future actions. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> - {% highlight java %} -DataStream<T> input = ...; +### Fire and Purge -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>); -{% endhighlight %} -</div> +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction` +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor). +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream [T] = ... +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content. +By default, the pre-implemented triggers simply `FIRE` without purging the window state. -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements. -<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`. +### Default Triggers of WindowAssigners -## Triggers +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as +default trigger. This trigger simply fires once the watermark passes the end of a window. -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being -processed by the window function . The trigger observes how elements are added to windows -and can also keep track of the progress of processing time and event time. Once a trigger -determines that a window is ready for processing, it fires. This is the signal for the -window operation to take the elements that are currently in the window and pass them along to -the window function to produce output for the firing window. +<span class="label label-danger">Attention</span> The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`. -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as -default trigger. This trigger simply fires once the watermark passes the end of a window. +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the +progress of time but only by count. Right now, you have to write your own custom trigger if +you want to react based on both time and count. -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The -whole specification of the windowed transformation would then look like this: +### Built-in and Custom Triggers -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<T> input = ...; +Flink comes with a few built-in triggers. -input .keyBy(<key selector>) .window(<window assigner>) .trigger(<trigger>) .<windowed transformation>(<window function>); - {% endhighlight %} -</div> +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks. +* The `ProcessingTimeTrigger` fires based on processing time. +* The `CountTrigger` which fires once the number of elements in a window exceeds the given limit. +* The `PurgingTrigger` takes as argument another trigger and transforms it into a purging one. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream [T] = ... +If you need to implement a custom trigger, you should check out the abstract {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger" %} class. Please note that the API is still evolving and might change in future versions of Flink. -input - .keyBy(<key selector>) - .window(<window assigner>) - .trigger(<trigger>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> -Flink comes with a few triggers out-of-box: there is the already mentioned `EventTimeTrigger` that -fires based on the progress of event-time as measured by the watermark, the `ProcessingTimeTrigger` -does the same but based on processing time and the `CountTrigger` fires once the number of elements -in a window exceeds the given limit. +## Evictors -<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you -are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a -`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the -progress of time but only by count. Right now, you have to write your own custom trigger if -you want to react based on both time and count. +Flink’s windowing model allows specifying an optional `Evictor` in addition to the `WindowAssigner` and the `Trigger`. +This can be done using the `evictor(...)` method (shown in the beginning of this document). The evictor has the ability +to remove elements from a window after the trigger fires and before and/or after the window function is applied. +To do so, the `Evictor` interface has two methods: + + /** + * Optionally evicts elements. Called before windowing function. + * + * @param elements The elements currently in the pane. + * @param size The current number of elements in the pane. + * @param window The {@link Window} + * @param evictorContext The context for the Evictor + */ + void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); + + /** + * Optionally evicts elements. Called after windowing function. + * + * @param elements The elements currently in the pane. + * @param size The current number of elements in the pane. + * @param window The {@link Window} + * @param evictorContext The context for the Evictor + */ + void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); + +The `evictBefore()` contains the eviction logic to be applied before the window function, while the `evictAfter()` +contains the one to be applied after the window function. Elements evicted before the application of the window +function will not be processed by it. + +Flink comes with three pre-implemented evictors. These are: + +* `CountEvictor`: keeps up to a user-specified number of elements from the window and discards the remaining ones from +the beginning of the window buffer. +* `DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the delta between the last element in the +window buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold. +* `TimeEvictor`: takes as argument an `interval` in milliseconds and for a given window, it finds the maximum +timestamp `max_ts` among its elements and removes all the elements with timestamps smaller than `max_ts - interval`. -The internal `Trigger` API is still considered experimental but you can check out the code -if you want to write your own custom trigger: - {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger.java" %} . +<span class="label label-info">Default</span> By default, all the pre-implemented evictors apply their logic before the +window function. Non-keyed Windowing +<span class="label label-danger">Attention</span> Specifying an evictor prevents any pre-aggregation, as all the +elements of a window have to be passed to the evictor before applying the computation. -You can also leave out the `keyBy()` when specifying a windowed transformation. This means, however, -that Flink cannot process windows for different keys in parallel, essentially turning the -transformation into a non-parallel operation. +<span class="label label-danger">Attention</span> Flink provides no guarantees about the order of the elements within +a window. This implies that although an evictor may remove elements from the beginning of the window, these are not +necessarily the ones that arrive first or last. -<span class="label label-danger">Warning</span> As mentioned in the introduction, non-keyed -windows have the disadvantage that work cannot be distributed in the cluster because -windows cannot be computed independently per key. This can have severe performance implications. +## Allowed Lateness -The basic structure of a non-keyed windowed transformation is as follows: +When working with event-time windowing it can happen that elements arrive late, i.e. the watermark that Flink uses to +keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See + [event time] (./event_time.html) and especially [late elements] (./event_time.html#late-elements) for a more thorough +discussion of how Flink deals with event time. + +By default, late elements are dropped if their associated window was already evaluated. However, +Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness +specifies by how much time elements can be late before they are dropped. Elements that arrive +within the allowed lateness of a window are still added to the window and trigger an immediate evaluation of the window which might emit elements. — End diff – "the trigger an immediate evaluation" depends on the `Trigger`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97833288

          — Diff: docs/dev/windows.md —
          @@ -758,8 +831,33 @@ input
          val input: DataStream[T] = ...

          input

          • .windowAll(<window assigner>)
            + .keyBy(<key selector>)
            + .window(<window assigner>)
            + .allowedLateness(<time>)
            .<windowed transformation>(<window function>) {% endhighlight %}

            </div>
            </div>
            +
            +<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
            +data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
            +
            +### Late elements considerations
            +
            +When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes
            +the end of the window. In these cases, when a late but not dropped element arrives, it will trigger another firing for the
            +window. These firings are called `late firings`, as they are triggered by late events and in contrast to the `main firing`
            +which is the first firing of the window. In case of session windows, late firings can further lead to merging of windows,
            +as they may "bridge" the gap between two pre-existing, unmerged windows.
            +
            +<span class="label label-info">Attention</span> You should be aware that the elements emitted by a late firing should be treated as updated results of a previous computation, i.e., your data stream will contain multiple results for the same computation. Depending on your application, you need to take these duplicated results into account or deduplicate them.
            +
            +## Useful state size considerations
            +
            +Windows can be defined over long periods of time (such as days, weeks, or months) and therefore accumulate very large state. There are a couple of rules to keep in mind when estimating the storage requirements of your windowing computation:
            +
            +1. Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the [Window Assigners](#window-assigners) section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea.
            +
            +2. `FoldFunction` and `ReduceFunction` can significantly reduce the storage requirements, as they eagerly aggregate elements and store only one value per window. In contrast a `WindowFunction` must accumulate all elements.

              • End diff –

          In contrast, just using a `WindowFunction` requires accumulating all elements.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97833288 — Diff: docs/dev/windows.md — @@ -758,8 +831,33 @@ input val input: DataStream [T] = ... input .windowAll(<window assigner>) + .keyBy(<key selector>) + .window(<window assigner>) + .allowedLateness(<time>) .<windowed transformation>(<window function>) {% endhighlight %} </div> </div> + +<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no +data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`. + +### Late elements considerations + +When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes +the end of the window. In these cases, when a late but not dropped element arrives, it will trigger another firing for the +window. These firings are called `late firings`, as they are triggered by late events and in contrast to the `main firing` +which is the first firing of the window. In case of session windows, late firings can further lead to merging of windows, +as they may "bridge" the gap between two pre-existing, unmerged windows. + +<span class="label label-info">Attention</span> You should be aware that the elements emitted by a late firing should be treated as updated results of a previous computation, i.e., your data stream will contain multiple results for the same computation. Depending on your application, you need to take these duplicated results into account or deduplicate them. + +## Useful state size considerations + +Windows can be defined over long periods of time (such as days, weeks, or months) and therefore accumulate very large state. There are a couple of rules to keep in mind when estimating the storage requirements of your windowing computation: + +1. Flink creates one copy of each element per window to which it belongs. Given this, tumbling windows keep one copy of each element (an element belongs to exactly window unless it is dropped late). In contrast, sliding windows create several of each element, as explained in the [Window Assigners] (#window-assigners) section. Hence, a sliding window of size 1 day and slide 1 second might not be a good idea. + +2. `FoldFunction` and `ReduceFunction` can significantly reduce the storage requirements, as they eagerly aggregate elements and store only one value per window. In contrast a `WindowFunction` must accumulate all elements. End diff – In contrast, just using a `WindowFunction` requires accumulating all elements.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97826641

          — Diff: docs/dev/windows.md —
          @@ -204,72 +221,120 @@ input

          {% highlight scala %}

          val input: DataStream[T] = ...

          -// tumbling event-time windows
          -input

          • .keyBy(<key selector>)
          • .window(TumblingEventTimeWindows.of(Time.seconds(5)))
          • .<windowed transformation>(<window function>)
            -
            // sliding event-time windows
            input
            .keyBy(<key selector>)
            .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .<windowed transformation>(<window function>)

          -// event-time session windows
          +// sliding processing-time windows
          input
          .keyBy(<key selector>)

          • .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
            + .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .<windowed transformation>(<window function>)

          -// tumbling processing-time windows
          +// sliding processing-time windows offset by -8 hours
          input
          .keyBy(<key selector>)

          • .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            + .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
            .<windowed transformation>(<window function>)
            + {% endhighlight %}

            +</div>
            +</div>

          -// sliding processing-time windows
          +Time intervals can be specified by using one of `Time.milliseconds`, `Time.seconds`,
          +`Time.minutes`, and so on.
          +
          +As shown in the last example, sliding window assigners also take an optional `offset` parameter
          +that can be used to change the alignment of windows. For example, without offsets hourly windows
          +sliding by 30 minutes are aligned with epoch, that is you will get windows such as
          +`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you want to change that
          +you can give an offset. With an offset of 15 minutes you would, for example, get
          +`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc.
          +An important use case for offsets is to adjust windows to timezones other than UTC-0.
          +For example, in China you would have to specify an offset of `Time.hours(-8)`.
          +
          +### Session Windows
          +
          +The session windows assigner groups elements by sessions of activity. Session windows do not overlap and
          +do not have a fixed start and end time in contrast to tumbling windows and sliding windows. Instead a
          +session window assigner closes a window when it does not receive elements for a certain period
          +of time, i.e., when a gap of inactivity occurred. A session window assigner is configured with the session gap which
          — End diff –

          This sentence looks a bit problematic since it is not the assigner that decides when to close a window and when to open a new one. I see that it makes sense from an explanation point of view but it misrepresents how it actually works.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97826641 — Diff: docs/dev/windows.md — @@ -204,72 +221,120 @@ input {% highlight scala %} val input: DataStream [T] = ... -// tumbling event-time windows -input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .<windowed transformation>(<window function>) - // sliding event-time windows input .keyBy(<key selector>) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) -// event-time session windows +// sliding processing-time windows input .keyBy(<key selector>) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) + .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .<windowed transformation>(<window function>) -// tumbling processing-time windows +// sliding processing-time windows offset by -8 hours input .keyBy(<key selector>) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) .<windowed transformation>(<window function>) + {% endhighlight %} +</div> +</div> -// sliding processing-time windows +Time intervals can be specified by using one of `Time.milliseconds `, `Time.seconds `, +`Time.minutes `, and so on. + +As shown in the last example, sliding window assigners also take an optional `offset` parameter +that can be used to change the alignment of windows. For example, without offsets hourly windows +sliding by 30 minutes are aligned with epoch, that is you will get windows such as +`1:00:00.000 - 1:59:59.999`, `1:30:00.000 - 2:29:59.999` and so on. If you want to change that +you can give an offset. With an offset of 15 minutes you would, for example, get +`1:15:00.000 - 2:14:59.999`, `1:45:00.000 - 2:44:59.999` etc. +An important use case for offsets is to adjust windows to timezones other than UTC-0. +For example, in China you would have to specify an offset of `Time.hours(-8)`. + +### Session Windows + +The session windows assigner groups elements by sessions of activity. Session windows do not overlap and +do not have a fixed start and end time in contrast to tumbling windows and sliding windows . Instead a +session window assigner closes a window when it does not receive elements for a certain period +of time, i.e., when a gap of inactivity occurred. A session window assigner is configured with the session gap which — End diff – This sentence looks a bit problematic since it is not the assigner that decides when to close a window and when to open a new one. I see that it makes sense from an explanation point of view but it misrepresents how it actually works.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97827610

          — Diff: docs/dev/windows.md —
          @@ -622,133 +690,138 @@ input
          </div>
          </div>

              1. Dealing with Late Data
                +## Triggers

          -When working with event-time windowing it can happen that elements arrive late, i.e the
          -watermark that Flink uses to keep track of the progress of event-time is already past the
          -end timestamp of a window to which an element belongs. Please
          -see [event time](./event_time.html) and especially
          -[late elements](./event_time.html#late-elements) for a more thorough discussion of
          -how Flink deals with event time.
          +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          — End diff –

          Maybe have `window assigner` here because we also have `window function`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97827610 — Diff: docs/dev/windows.md — @@ -622,133 +690,138 @@ input </div> </div> Dealing with Late Data +## Triggers -When working with event-time windowing it can happen that elements arrive late, i.e the -watermark that Flink uses to keep track of the progress of event-time is already past the -end timestamp of a window to which an element belongs. Please -see [event time] (./event_time.html) and especially - [late elements] (./event_time.html#late-elements) for a more thorough discussion of -how Flink deals with event time. +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be — End diff – Maybe have ` window assigner ` here because we also have ` window function `.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r97832399

          — Diff: docs/dev/windows.md —
          @@ -622,133 +690,138 @@ input
          </div>
          </div>

              1. Dealing with Late Data
                +## Triggers

          -When working with event-time windowing it can happen that elements arrive late, i.e the
          -watermark that Flink uses to keep track of the progress of event-time is already past the
          -end timestamp of a window to which an element belongs. Please
          -see [event time](./event_time.html) and especially
          -[late elements](./event_time.html#late-elements) for a more thorough discussion of
          -how Flink deals with event time.
          +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -You can specify how a windowed transformation should deal with late elements and how much lateness
          -is allowed. The parameter for this is called allowed lateness. This specifies by how much time
          -elements can be late. Elements that arrive within the allowed lateness are still put into windows
          -and are considered when computing window results. If elements arrive after the allowed lateness they
          -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage
          -collected once the watermark passes the end of a window plus the allowed lateness.
          +The trigger interface provides five methods that react to different events:

          -<span class="label label-info">Default</span> By default, the allowed lateness is set to
          -`0`. That is, elements that arrive behind the watermark will be dropped.
          +* The `onElement()` method is called for each element that is added to a window.
          +* The `onEventTime()` method is called when a registered event-time timer fires.
          +* The `onProcessingTime()` method is called when a registered processing-time timer fires.
          +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
          +* Finally the `clear()` method performs any action needed upon removal of the corresponding window.

          -You can specify an allowed lateness like this:
          +Any of these methods can be used to register processing- or event-time timers for future actions.

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -

          {% highlight java %}
          -DataStream<T> input = ...;
          +### Fire and Purge

          -input
          - .keyBy(<key selector>)
          - .window(<window assigner>)
          - .allowedLateness(<time>)
          - .<windowed transformation>(<window function>);
          -{% endhighlight %}
          -</div>
          +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction`
          +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor).
          +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result.

          -<div data-lang="scala" markdown="1">
          -{% highlight scala %}
          -val input: DataStream[T] = ...
          +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
          +By default, the pre-implemented triggers simply `FIRE` without purging the window state.

          -input
          - .keyBy(<key selector>)
          - .window(<window assigner>)
          - .allowedLateness(<time>)
          - .<windowed transformation>(<window function>)
          -{% endhighlight %}
          -</div>
          -</div>
          +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements.

          -<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
          -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
          +### Default Triggers of WindowAssigners

          -## Triggers
          +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as
          +default trigger. This trigger simply fires once the watermark passes the end of a window.

          -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being
          -processed by the window function. The trigger observes how elements are added to windows
          -and can also keep track of the progress of processing time and event time. Once a trigger
          -determines that a window is ready for processing, it fires. This is the signal for the
          -window operation to take the elements that are currently in the window and pass them along to
          -the window function to produce output for the firing window.
          +<span class="label label-danger">Attention</span> The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`.

          -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be
          -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as
          -default trigger. This trigger simply fires once the watermark passes the end of a window.
          +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you
          +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a
          +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the
          +progress of time but only by count. Right now, you have to write your own custom trigger if
          +you want to react based on both time and count.

          -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The
          -whole specification of the windowed transformation would then look like this:
          +### Built-in and Custom Triggers

          -<div class="codetabs" markdown="1">
          -<div data-lang="java" markdown="1">
          -{% highlight java %}

          -DataStream<T> input = ...;
          +Flink comes with a few built-in triggers.

          -input

          • .keyBy(<key selector>)
          • .window(<window assigner>)
          • .trigger(<trigger>)
          • .<windowed transformation>(<window function>);
            - {% endhighlight %}
            -</div>
            +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks.
            +* The `ProcessingTimeTrigger` fires based on processing time.
            +* The `CountTrigger` which fires once the number of elements in a window exceeds the given limit.
            +* The `PurgingTrigger` takes as argument another trigger and transforms it into a purging one.

            -<div data-lang="scala" markdown="1">
            -{% highlight scala %}
            -val input: DataStream[T] = ...
            +If you need to implement a custom trigger, you should check out the abstract {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger" %} class. Please note that the API is still evolving and might change in future versions of Flink.

            -input
            - .keyBy(<key selector>)
            - .window(<window assigner>)
            - .trigger(<trigger>)
            - .<windowed transformation>(<window function>)
            -{% endhighlight %}

            -</div>
            -</div>

          -Flink comes with a few triggers out-of-box: there is the already mentioned `EventTimeTrigger` that
          -fires based on the progress of event-time as measured by the watermark, the `ProcessingTimeTrigger`
          -does the same but based on processing time and the `CountTrigger` fires once the number of elements
          -in a window exceeds the given limit.
          +## Evictors

          -<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you
          -are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a
          -`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the
          -progress of time but only by count. Right now, you have to write your own custom trigger if
          -you want to react based on both time and count.
          +Flink’s windowing model allows specifying an optional `Evictor` in addition to the `WindowAssigner` and the `Trigger`.
          +This can be done using the `evictor(...)` method (shown in the beginning of this document). The evictor has the ability
          +to remove elements from a window after the trigger fires and before and/or after the window function is applied.
          +To do so, the `Evictor` interface has two methods:
          +
          + /**
          + * Optionally evicts elements. Called before windowing function.
          + *
          + * @param elements The elements currently in the pane.
          + * @param size The current number of elements in the pane.
          + * @param window The

          {@link Window}
          + * @param evictorContext The context for the Evictor
          + */
          + void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
          +
          + /**
          + * Optionally evicts elements. Called after windowing function.
          + *
          + * @param elements The elements currently in the pane.
          + * @param size The current number of elements in the pane.
          + * @param window The {@link Window}

          + * @param evictorContext The context for the Evictor
          + */
          + void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
          +
          +The `evictBefore()` contains the eviction logic to be applied before the window function, while the `evictAfter()`
          +contains the one to be applied after the window function. Elements evicted before the application of the window
          +function will not be processed by it.
          +
          +Flink comes with three pre-implemented evictors. These are:
          +
          +* `CountEvictor`: keeps up to a user-specified number of elements from the window and discards the remaining ones from
          +the beginning of the window buffer.
          +* `DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the delta between the last element in the
          +window buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold.
          +* `TimeEvictor`: takes as argument an `interval` in milliseconds and for a given window, it finds the maximum
          +timestamp `max_ts` among its elements and removes all the elements with timestamps smaller than `max_ts - interval`.

          -The internal `Trigger` API is still considered experimental but you can check out the code
          -if you want to write your own custom trigger:
          -

          {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger.java" %}

          .
          +<span class="label label-info">Default</span> By default, all the pre-implemented evictors apply their logic before the
          +window function.

              1. Non-keyed Windowing
                +<span class="label label-danger">Attention</span> Specifying an evictor prevents any pre-aggregation, as all the
                +elements of a window have to be passed to the evictor before applying the computation.

          -You can also leave out the `keyBy()` when specifying a windowed transformation. This means, however,
          -that Flink cannot process windows for different keys in parallel, essentially turning the
          -transformation into a non-parallel operation.
          +<span class="label label-danger">Attention</span> Flink provides no guarantees about the order of the elements within
          +a window. This implies that although an evictor may remove elements from the beginning of the window, these are not
          +necessarily the ones that arrive first or last.

          -<span class="label label-danger">Warning</span> As mentioned in the introduction, non-keyed
          -windows have the disadvantage that work cannot be distributed in the cluster because
          -windows cannot be computed independently per key. This can have severe performance implications.

          +## Allowed Lateness

          -The basic structure of a non-keyed windowed transformation is as follows:
          +When working with event-time windowing it can happen that elements arrive late, i.e. the watermark that Flink uses to
          +keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See
          +[event time](./event_time.html) and especially [late elements](./event_time.html#late-elements) for a more thorough
          +discussion of how Flink deals with event time.
          +
          +By default, late elements are dropped if their associated window was already evaluated. However,
          — End diff –

          They are not dropped when the window was already evaluated but when the watermark is past the end of the window plus the allowed lateness.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r97832399 — Diff: docs/dev/windows.md — @@ -622,133 +690,138 @@ input </div> </div> Dealing with Late Data +## Triggers -When working with event-time windowing it can happen that elements arrive late, i.e the -watermark that Flink uses to keep track of the progress of event-time is already past the -end timestamp of a window to which an element belongs. Please -see [event time] (./event_time.html) and especially - [late elements] (./event_time.html#late-elements) for a more thorough discussion of -how Flink deals with event time. +A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. +If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -You can specify how a windowed transformation should deal with late elements and how much lateness -is allowed. The parameter for this is called allowed lateness . This specifies by how much time -elements can be late. Elements that arrive within the allowed lateness are still put into windows -and are considered when computing window results. If elements arrive after the allowed lateness they -will be dropped. Flink will also make sure that any state held by the windowing operation is garbage -collected once the watermark passes the end of a window plus the allowed lateness. +The trigger interface provides five methods that react to different events: -<span class="label label-info">Default</span> By default, the allowed lateness is set to -`0`. That is, elements that arrive behind the watermark will be dropped. +* The `onElement()` method is called for each element that is added to a window. +* The `onEventTime()` method is called when a registered event-time timer fires. +* The `onProcessingTime()` method is called when a registered processing-time timer fires. +* The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. +* Finally the `clear()` method performs any action needed upon removal of the corresponding window. -You can specify an allowed lateness like this: +Any of these methods can be used to register processing- or event-time timers for future actions. -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> - {% highlight java %} -DataStream<T> input = ...; +### Fire and Purge -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>); -{% endhighlight %} -</div> +Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction` +all elements are passed to the `WindowFunction` (possibly after passing them to an evictor). +Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream [T] = ... +When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content. +By default, the pre-implemented triggers simply `FIRE` without purging the window state. -input - .keyBy(<key selector>) - .window(<window assigner>) - .allowedLateness(<time>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> +<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements. -<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no -data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`. +### Default Triggers of WindowAssigners -## Triggers +The default `Trigger` of a `WindowAssigner` is appropriate for many use cases. For example, all the event-time window assigners have an `EventTimeTrigger` as +default trigger. This trigger simply fires once the watermark passes the end of a window. -A `Trigger` determines when a window (as assigned by the `WindowAssigner`) is ready for being -processed by the window function . The trigger observes how elements are added to windows -and can also keep track of the progress of processing time and event time. Once a trigger -determines that a window is ready for processing, it fires. This is the signal for the -window operation to take the elements that are currently in the window and pass them along to -the window function to produce output for the firing window. +<span class="label label-danger">Attention</span> The default trigger of the `GlobalWindow` is the `NeverTrigger` which does never fire. Consequently, you always have to define a custom trigger when using a `GlobalWindow`. -Each `WindowAssigner` (except `GlobalWindows`) comes with a default trigger that should be -appropriate for most use cases. For example, `TumblingEventTimeWindows` has an `EventTimeTrigger` as -default trigger. This trigger simply fires once the watermark passes the end of a window. +<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you +are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a +`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the +progress of time but only by count. Right now, you have to write your own custom trigger if +you want to react based on both time and count. -You can specify the trigger to be used by calling `trigger()` with a given `Trigger`. The -whole specification of the windowed transformation would then look like this: +### Built-in and Custom Triggers -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -DataStream<T> input = ...; +Flink comes with a few built-in triggers. -input .keyBy(<key selector>) .window(<window assigner>) .trigger(<trigger>) .<windowed transformation>(<window function>); - {% endhighlight %} -</div> +* The (already mentioned) `EventTimeTrigger` fires based on the progress of event-time as measured by watermarks. +* The `ProcessingTimeTrigger` fires based on processing time. +* The `CountTrigger` which fires once the number of elements in a window exceeds the given limit. +* The `PurgingTrigger` takes as argument another trigger and transforms it into a purging one. -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val input: DataStream [T] = ... +If you need to implement a custom trigger, you should check out the abstract {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger" %} class. Please note that the API is still evolving and might change in future versions of Flink. -input - .keyBy(<key selector>) - .window(<window assigner>) - .trigger(<trigger>) - .<windowed transformation>(<window function>) -{% endhighlight %} -</div> -</div> -Flink comes with a few triggers out-of-box: there is the already mentioned `EventTimeTrigger` that -fires based on the progress of event-time as measured by the watermark, the `ProcessingTimeTrigger` -does the same but based on processing time and the `CountTrigger` fires once the number of elements -in a window exceeds the given limit. +## Evictors -<span class="label label-danger">Attention</span> By specifying a trigger using `trigger()` you -are overwriting the default trigger of a `WindowAssigner`. For example, if you specify a -`CountTrigger` for `TumblingEventTimeWindows` you will no longer get window firings based on the -progress of time but only by count. Right now, you have to write your own custom trigger if -you want to react based on both time and count. +Flink’s windowing model allows specifying an optional `Evictor` in addition to the `WindowAssigner` and the `Trigger`. +This can be done using the `evictor(...)` method (shown in the beginning of this document). The evictor has the ability +to remove elements from a window after the trigger fires and before and/or after the window function is applied. +To do so, the `Evictor` interface has two methods: + + /** + * Optionally evicts elements. Called before windowing function. + * + * @param elements The elements currently in the pane. + * @param size The current number of elements in the pane. + * @param window The {@link Window} + * @param evictorContext The context for the Evictor + */ + void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); + + /** + * Optionally evicts elements. Called after windowing function. + * + * @param elements The elements currently in the pane. + * @param size The current number of elements in the pane. + * @param window The {@link Window} + * @param evictorContext The context for the Evictor + */ + void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); + +The `evictBefore()` contains the eviction logic to be applied before the window function, while the `evictAfter()` +contains the one to be applied after the window function. Elements evicted before the application of the window +function will not be processed by it. + +Flink comes with three pre-implemented evictors. These are: + +* `CountEvictor`: keeps up to a user-specified number of elements from the window and discards the remaining ones from +the beginning of the window buffer. +* `DeltaEvictor`: takes a `DeltaFunction` and a `threshold`, computes the delta between the last element in the +window buffer and each of the remaining ones, and removes the ones with a delta greater or equal to the threshold. +* `TimeEvictor`: takes as argument an `interval` in milliseconds and for a given window, it finds the maximum +timestamp `max_ts` among its elements and removes all the elements with timestamps smaller than `max_ts - interval`. -The internal `Trigger` API is still considered experimental but you can check out the code -if you want to write your own custom trigger: - {% gh_link /flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java "Trigger.java" %} . +<span class="label label-info">Default</span> By default, all the pre-implemented evictors apply their logic before the +window function. Non-keyed Windowing +<span class="label label-danger">Attention</span> Specifying an evictor prevents any pre-aggregation, as all the +elements of a window have to be passed to the evictor before applying the computation. -You can also leave out the `keyBy()` when specifying a windowed transformation. This means, however, -that Flink cannot process windows for different keys in parallel, essentially turning the -transformation into a non-parallel operation. +<span class="label label-danger">Attention</span> Flink provides no guarantees about the order of the elements within +a window. This implies that although an evictor may remove elements from the beginning of the window, these are not +necessarily the ones that arrive first or last. -<span class="label label-danger">Warning</span> As mentioned in the introduction, non-keyed -windows have the disadvantage that work cannot be distributed in the cluster because -windows cannot be computed independently per key. This can have severe performance implications. +## Allowed Lateness -The basic structure of a non-keyed windowed transformation is as follows: +When working with event-time windowing it can happen that elements arrive late, i.e. the watermark that Flink uses to +keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See + [event time] (./event_time.html) and especially [late elements] (./event_time.html#late-elements) for a more thorough +discussion of how Flink deals with event time. + +By default, late elements are dropped if their associated window was already evaluated. However, — End diff – They are not dropped when the window was already evaluated but when the watermark is past the end of the window plus the allowed lateness.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3191

          @aljoscha thanks for the review. I integrated the comments.
          I also include @fhueske as he also contributed to this.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3191 @aljoscha thanks for the review. I integrated the comments. I also include @fhueske as he also contributed to this.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98026366

          — Diff: docs/dev/windows.md —
          @@ -312,9 +328,9 @@ Time intervals can be specified by using one of `Time.milliseconds`, `Time.se
          they are evaluated differently than tumbling and sliding windows. Internally, a session window operator
          creates a new window for each arriving record and merges windows together if their are closer to each other
          than the defined gap.
          -In order to be mergable, a session window operator requires a mergable [Trigger](#triggers) and a mergable
          +In order to be mergeable, a session window operator requires a merging [Trigger](#triggers) and a merging
          [Window Function](#window-functions), such as `ReduceFunction` or `WindowFunction`
          -(`FoldFunction` is not mergable.)
          +(`FoldFunction` is not mergeable.)
          — End diff –

          "`FoldFunction` cannot merge" ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98026366 — Diff: docs/dev/windows.md — @@ -312,9 +328,9 @@ Time intervals can be specified by using one of `Time.milliseconds `, `Time.se they are evaluated differently than tumbling and sliding windows. Internally, a session window operator creates a new window for each arriving record and merges windows together if their are closer to each other than the defined gap. -In order to be mergable, a session window operator requires a mergable [Trigger] (#triggers) and a mergable +In order to be mergeable, a session window operator requires a merging [Trigger] (#triggers) and a merging [Window Function] (#window-functions), such as `ReduceFunction` or `WindowFunction` -(`FoldFunction` is not mergable.) +(`FoldFunction` is not mergeable.) — End diff – "`FoldFunction` cannot merge" ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98025666

          — Diff: docs/dev/windows.md —
          @@ -86,9 +101,10 @@ will be performed by a single task, i.e. with parallelism of 1.

            1. Window Assigners

          -After specifying whether your stream is keyed or not, the next step is to define a windowing strategy.
          -The windowing strategy defines how elements are assigned to windows. This is done by specifying the
          -`WindowAssigner` that corresponds to the windowing strategy of your choice in the `window(...)` call.
          +After specifying whether your stream is keyed or not, the next step is to define a window assigner.
          +The window assigner defines how elements are assigned to windows. This is done by specifying the
          +`WindowAssigner` that corresponds to the windowing strategy of your choice in the `window(...)` (for keyed streams)
          — End diff –

          "This is done by specifying the `WindowAssigner` of your choice in the `window(...)` ...", i.e. remove "windowing strategy"

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98025666 — Diff: docs/dev/windows.md — @@ -86,9 +101,10 @@ will be performed by a single task, i.e. with parallelism of 1. Window Assigners -After specifying whether your stream is keyed or not, the next step is to define a windowing strategy . -The windowing strategy defines how elements are assigned to windows. This is done by specifying the -`WindowAssigner` that corresponds to the windowing strategy of your choice in the `window(...)` call. +After specifying whether your stream is keyed or not, the next step is to define a window assigner . +The window assigner defines how elements are assigned to windows. This is done by specifying the +`WindowAssigner` that corresponds to the windowing strategy of your choice in the `window(...)` (for keyed streams) — End diff – "This is done by specifying the `WindowAssigner` of your choice in the `window(...)` ...", i.e. remove "windowing strategy"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98026721

          — Diff: docs/dev/windows.md —
          @@ -692,30 +708,39 @@ input

            1. Triggers

          -A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +A `Trigger` determines when a window (as formed by the window assigner) is ready to be
          processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -The trigger interface provides five methods that react to different events:
          +The trigger interface has five methods that allow a `Trigger` to react to different events:

          • The `onElement()` method is called for each element that is added to a window.
          • The `onEventTime()` method is called when a registered event-time timer fires.
          • The `onProcessingTime()` method is called when a registered processing-time timer fires.
          • The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
          • Finally the `clear()` method performs any action needed upon removal of the corresponding window.

          Any of these methods can be used to register processing or event-time timers for future actions.
          +Two things to notice about the above methods are:
          +
          +1) The first three can return a `TriggerResult`, i.e. take action as a response to their corresponding event. The action can be one of the following:
          — End diff –

          "The first three decide how to act on their invocation event by returning a `TriggerResult`."

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98026721 — Diff: docs/dev/windows.md — @@ -692,30 +708,39 @@ input Triggers -A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +A `Trigger` determines when a window (as formed by the window assigner ) is ready to be processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -The trigger interface provides five methods that react to different events: +The trigger interface has five methods that allow a `Trigger` to react to different events: The `onElement()` method is called for each element that is added to a window. The `onEventTime()` method is called when a registered event-time timer fires. The `onProcessingTime()` method is called when a registered processing-time timer fires. The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. Finally the `clear()` method performs any action needed upon removal of the corresponding window. Any of these methods can be used to register processing or event-time timers for future actions. +Two things to notice about the above methods are: + +1) The first three can return a `TriggerResult`, i.e. take action as a response to their corresponding event. The action can be one of the following: — End diff – "The first three decide how to act on their invocation event by returning a `TriggerResult`."
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98025175

          — Diff: docs/dev/windows.md —
          @@ -27,9 +27,13 @@ Windows are at the heart of processing infinite streams. Windows split the strea
          over which we can apply computations. This document focuses on how windowing is performed in Flink and how the
          programmer can benefit to the maximum from its offered functionality.

          -The general structure of a windowed Flink program is presented below. This is also going to serve as a roadmap for
          -the rest of the page.
          +The general structure of a windowed Flink program is presented below. The first snippet refers to keyed streams,
          +while the second to non-keyed ones. As one can see, the only difference is the `keyBy(...)` call for the keyed streams
          +and the `window(...)` which becomes `windowAll(...)` for non-keyed streams. These is also going to serve as a roadmap
          +for the rest of the page.

          + //---------------------- KEYED STREAMS ----------------------//
          — End diff –

          `KEYED STREAMS` -> `KEYED WINDOWS` as in the section heading below

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98025175 — Diff: docs/dev/windows.md — @@ -27,9 +27,13 @@ Windows are at the heart of processing infinite streams. Windows split the strea over which we can apply computations. This document focuses on how windowing is performed in Flink and how the programmer can benefit to the maximum from its offered functionality. -The general structure of a windowed Flink program is presented below. This is also going to serve as a roadmap for -the rest of the page. +The general structure of a windowed Flink program is presented below. The first snippet refers to keyed streams, +while the second to non-keyed ones. As one can see, the only difference is the `keyBy(...)` call for the keyed streams +and the `window(...)` which becomes `windowAll(...)` for non-keyed streams. These is also going to serve as a roadmap +for the rest of the page. + //---------------------- KEYED STREAMS ----------------------// — End diff – `KEYED STREAMS` -> `KEYED WINDOWS` as in the section heading below
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98028299

          — Diff: docs/dev/windows.md —
          @@ -692,30 +708,39 @@ input

            1. Triggers

          -A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +A `Trigger` determines when a window (as formed by the window assigner) is ready to be
          processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -The trigger interface provides five methods that react to different events:
          +The trigger interface has five methods that allow a `Trigger` to react to different events:

          • The `onElement()` method is called for each element that is added to a window.
          • The `onEventTime()` method is called when a registered event-time timer fires.
          • The `onProcessingTime()` method is called when a registered processing-time timer fires.
          • The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
          • Finally the `clear()` method performs any action needed upon removal of the corresponding window.

          Any of these methods can be used to register processing or event-time timers for future actions.
          +Two things to notice about the above methods are:
          +
          +1) The first three can return a `TriggerResult`, i.e. take action as a response to their corresponding event. The action can be one of the following:
          +* `CONTINUE`: do nothing,
          +* `FIRE`: trigger the computation,
          +* `PURGE`: clear the elements in the window, and
          +* `FIRE_AND_PURGE`: take both previous actions.
          +
          +2) Any of these methods can be used to register processing- or event-time timers for future actions.

              1. Fire and Purge

          -Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction`
          +Once a trigger determines that a window is ready for processing, it fires, i.e., it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the window operator
          +to emit the result of the current window. Given a window with a `WindowFunction`
          all elements are passed to the `WindowFunction` (possibly after passing them to an evictor).
          Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result.

          When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content.
          By default, the pre-implemented triggers simply `FIRE` without purging the window state.

          -<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements.
          +<span class="label label-danger">Attention</span> Purging will simply remove the contents of the window and will leave any eventual meta-information about the window and any trigger state intact.
          — End diff –

          "eventual" -> "potential"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98028299 — Diff: docs/dev/windows.md — @@ -692,30 +708,39 @@ input Triggers -A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +A `Trigger` determines when a window (as formed by the window assigner ) is ready to be processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -The trigger interface provides five methods that react to different events: +The trigger interface has five methods that allow a `Trigger` to react to different events: The `onElement()` method is called for each element that is added to a window. The `onEventTime()` method is called when a registered event-time timer fires. The `onProcessingTime()` method is called when a registered processing-time timer fires. The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. Finally the `clear()` method performs any action needed upon removal of the corresponding window. Any of these methods can be used to register processing or event-time timers for future actions. +Two things to notice about the above methods are: + +1) The first three can return a `TriggerResult`, i.e. take action as a response to their corresponding event. The action can be one of the following: +* `CONTINUE`: do nothing, +* `FIRE`: trigger the computation, +* `PURGE`: clear the elements in the window, and +* `FIRE_AND_PURGE`: take both previous actions. + +2) Any of these methods can be used to register processing- or event-time timers for future actions. Fire and Purge -Once a trigger determines that a window is ready for processing, it fires. This is the signal for the window operator to emit the result of the current window. Given a window with a `WindowFunction` +Once a trigger determines that a window is ready for processing, it fires, i.e. , it returns `FIRE` or `FIRE_AND_PURGE`. This is the signal for the window operator +to emit the result of the current window. Given a window with a `WindowFunction` all elements are passed to the `WindowFunction` (possibly after passing them to an evictor). Windows with `ReduceFunction` of `FoldFunction` simply emit their eagerly aggregated result. When a trigger fires, it can either `FIRE` or `FIRE_AND_PURGE`. While `FIRE` keeps the contents of the window, `FIRE_AND_PURGE` removes its content. By default, the pre-implemented triggers simply `FIRE` without purging the window state. -<span class="label label-danger">Attention</span> When purging, only the contents of the window are cleared. The window itself is not removed and accepts new elements. +<span class="label label-danger">Attention</span> Purging will simply remove the contents of the window and will leave any eventual meta-information about the window and any trigger state intact. — End diff – "eventual" -> "potential"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98025995

          — Diff: docs/dev/windows.md —
          @@ -257,11 +273,11 @@ For example, in China you would have to specify an offset of `Time.hours(-8)`.

              1. Session Windows

          The session windows assigner groups elements by sessions of activity. Session windows do not overlap and
          -do not have a fixed start and end time in contrast to tumbling windows and sliding windows. Instead a
          -session window assigner closes a window when it does not receive elements for a certain period
          -of time, i.e., when a gap of inactivity occurred. A session window assigner is configured with the session gap which
          -defines how long the assigner waits until it closes the current session window and assigns following elements
          -to a new session window.
          +do not have a fixed start and end time, in contrast to tumbling windows and sliding windows. Instead a
          +session window closes when it does not receive elements for a certain period of time, i.e., when a gap of
          +inactivity occurred. A session window assigner is configured with the session gap which
          +defines how long is the required period of inactivity. After this period expires, the current session closes
          — End diff –

          "After this period expires" -> "When this period expires"?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98025995 — Diff: docs/dev/windows.md — @@ -257,11 +273,11 @@ For example, in China you would have to specify an offset of `Time.hours(-8)`. Session Windows The session windows assigner groups elements by sessions of activity. Session windows do not overlap and -do not have a fixed start and end time in contrast to tumbling windows and sliding windows . Instead a -session window assigner closes a window when it does not receive elements for a certain period -of time, i.e., when a gap of inactivity occurred. A session window assigner is configured with the session gap which -defines how long the assigner waits until it closes the current session window and assigns following elements -to a new session window. +do not have a fixed start and end time, in contrast to tumbling windows and sliding windows . Instead a +session window closes when it does not receive elements for a certain period of time, i.e. , when a gap of +inactivity occurred. A session window assigner is configured with the session gap which +defines how long is the required period of inactivity. After this period expires, the current session closes — End diff – "After this period expires" -> "When this period expires"?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98028880

          — Diff: docs/dev/windows.md —
          @@ -795,15 +821,17 @@ necessarily the ones that arrive first or last.

            1. Allowed Lateness

          -When working with event-time windowing it can happen that elements arrive late, i.e. the watermark that Flink uses to
          +When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses to
          keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See
          [event time](./event_time.html) and especially [late elements](./event_time.html#late-elements) for a more thorough
          discussion of how Flink deals with event time.

          -By default, late elements are dropped if their associated window was already evaluated. However,
          +By default, late elements are dropped when the watermark is past the end of the window. However,
          Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness
          -specifies by how much time elements can be late before they are dropped. Elements that arrive
          -within the allowed lateness of a window are still added to the window and trigger an immediate evaluation of the window which might emit elements.
          +specifies by how much time elements can be late before they are dropped, and its default value is 0.
          +Elements that arrive after the watermark is past the end of the window but before it passes the end of
          +window plus the allowed lateness, are still added to the window. Depending on the trigger used,
          — End diff –

          "passes the end of window" -> "*passed* the end of *the* window"

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98028880 — Diff: docs/dev/windows.md — @@ -795,15 +821,17 @@ necessarily the ones that arrive first or last. Allowed Lateness -When working with event-time windowing it can happen that elements arrive late, i.e. the watermark that Flink uses to +When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses to keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See [event time] (./event_time.html) and especially [late elements] (./event_time.html#late-elements) for a more thorough discussion of how Flink deals with event time. -By default, late elements are dropped if their associated window was already evaluated. However, +By default, late elements are dropped when the watermark is past the end of the window. However, Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness -specifies by how much time elements can be late before they are dropped. Elements that arrive -within the allowed lateness of a window are still added to the window and trigger an immediate evaluation of the window which might emit elements. +specifies by how much time elements can be late before they are dropped, and its default value is 0. +Elements that arrive after the watermark is past the end of the window but before it passes the end of +window plus the allowed lateness, are still added to the window. Depending on the trigger used, — End diff – "passes the end of window" -> "* passed * the end of * the * window"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98028663

          — Diff: docs/dev/windows.md —
          @@ -795,15 +821,17 @@ necessarily the ones that arrive first or last.

            1. Allowed Lateness

          -When working with event-time windowing it can happen that elements arrive late, i.e. the watermark that Flink uses to
          +When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses to
          keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See
          [event time](./event_time.html) and especially [late elements](./event_time.html#late-elements) for a more thorough
          discussion of how Flink deals with event time.

          -By default, late elements are dropped if their associated window was already evaluated. However,
          +By default, late elements are dropped when the watermark is past the end of the window. However,
          Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness
          -specifies by how much time elements can be late before they are dropped. Elements that arrive
          -within the allowed lateness of a window are still added to the window and trigger an immediate evaluation of the window which might emit elements.
          +specifies by how much time elements can be late before they are dropped, and its default value is 0.
          +Elements that arrive after the watermark is past the end of the window but before it passes the end of
          — End diff –

          "after the watermark is past the end" -> "after the watermark passed the end"

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98028663 — Diff: docs/dev/windows.md — @@ -795,15 +821,17 @@ necessarily the ones that arrive first or last. Allowed Lateness -When working with event-time windowing it can happen that elements arrive late, i.e. the watermark that Flink uses to +When working with event-time windowing, it can happen that elements arrive late, i.e. the watermark that Flink uses to keep track of the progress of event-time is already past the end timestamp of a window to which an element belongs. See [event time] (./event_time.html) and especially [late elements] (./event_time.html#late-elements) for a more thorough discussion of how Flink deals with event time. -By default, late elements are dropped if their associated window was already evaluated. However, +By default, late elements are dropped when the watermark is past the end of the window. However, Flink allows to specify a maximum allowed lateness for window operators. Allowed lateness -specifies by how much time elements can be late before they are dropped. Elements that arrive -within the allowed lateness of a window are still added to the window and trigger an immediate evaluation of the window which might emit elements. +specifies by how much time elements can be late before they are dropped, and its default value is 0. +Elements that arrive after the watermark is past the end of the window but before it passes the end of — End diff – "after the watermark is past the end" -> "after the watermark passed the end"
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98027639

          — Diff: docs/dev/windows.md —
          @@ -692,30 +708,39 @@ input

            1. Triggers

          -A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be
          +A `Trigger` determines when a window (as formed by the window assigner) is ready to be
          processed by the window function. Each `WindowAssigner` comes with a default `Trigger`.
          If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`.

          -The trigger interface provides five methods that react to different events:
          +The trigger interface has five methods that allow a `Trigger` to react to different events:

          • The `onElement()` method is called for each element that is added to a window.
          • The `onEventTime()` method is called when a registered event-time timer fires.
          • The `onProcessingTime()` method is called when a registered processing-time timer fires.
          • The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
          • Finally the `clear()` method performs any action needed upon removal of the corresponding window.

          Any of these methods can be used to register processing or event-time timers for future actions.
          +Two things to notice about the above methods are:
          +
          +1) The first three can return a `TriggerResult`, i.e. take action as a response to their corresponding event. The action can be one of the following:
          +* `CONTINUE`: do nothing,
          +* `FIRE`: trigger the computation,
          +* `PURGE`: clear the elements in the window, and
          +* `FIRE_AND_PURGE`: take both previous actions.
          — End diff –

          "trigger the computation and clear the elements in the window afterwards."?

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98027639 — Diff: docs/dev/windows.md — @@ -692,30 +708,39 @@ input Triggers -A `Trigger` determines when a window (as formed by the `WindowAssigner`) is ready to be +A `Trigger` determines when a window (as formed by the window assigner ) is ready to be processed by the window function . Each `WindowAssigner` comes with a default `Trigger`. If the default trigger does not fit your needs, you can specify a custom trigger using `trigger(...)`. -The trigger interface provides five methods that react to different events: +The trigger interface has five methods that allow a `Trigger` to react to different events: The `onElement()` method is called for each element that is added to a window. The `onEventTime()` method is called when a registered event-time timer fires. The `onProcessingTime()` method is called when a registered processing-time timer fires. The `onMerge()` method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows. Finally the `clear()` method performs any action needed upon removal of the corresponding window. Any of these methods can be used to register processing or event-time timers for future actions. +Two things to notice about the above methods are: + +1) The first three can return a `TriggerResult`, i.e. take action as a response to their corresponding event. The action can be one of the following: +* `CONTINUE`: do nothing, +* `FIRE`: trigger the computation, +* `PURGE`: clear the elements in the window, and +* `FIRE_AND_PURGE`: take both previous actions. — End diff – "trigger the computation and clear the elements in the window afterwards."?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user fhueske commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3191#discussion_r98025225

          — Diff: docs/dev/windows.md —
          @@ -38,6 +42,17 @@ the rest of the page.
          [.allowedLateness()] <- optional, else zero
          .reduce/fold/apply() <- required: "function"

          + <!-- html comment to separate the two snippets -->
          +
          + //-------------------- NON-KEYED STREAMS --------------------//
          — End diff –

          `NON-KEYED STREAMS` -> `NON-KEYED WINDOWS` as in the section heading below

          Show
          githubbot ASF GitHub Bot added a comment - Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3191#discussion_r98025225 — Diff: docs/dev/windows.md — @@ -38,6 +42,17 @@ the rest of the page. [.allowedLateness()] <- optional, else zero .reduce/fold/apply() <- required: "function" + <!-- html comment to separate the two snippets --> + + //-------------------- NON-KEYED STREAMS --------------------// — End diff – `NON-KEYED STREAMS` -> `NON-KEYED WINDOWS` as in the section heading below
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3191

          Thanks @fhueske ! I integrated your comments.

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3191 Thanks @fhueske ! I integrated your comments.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3191

          This looks very good now! @kl0u and @fhueske, could you please figure out how to squash the commits into one or two commits? Once that's done I'll update the Figures from the Source Material and merge.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3191 This looks very good now! @kl0u and @fhueske, could you please figure out how to squash the commits into one or two commits? Once that's done I'll update the Figures from the Source Material and merge.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user kl0u opened a pull request:

          https://github.com/apache/flink/pull/3224

          FLINK-5529 FLINK-4752 [docs] Improve / extends windowing documentation

          This PR is for both the issues in the title.
          It refactors/improves/extends the documentation of the windowing logic in Flink 1.2.

          This is the continuation of https://github.com/apache/flink/pull/3191

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/kl0u/flink fhu-window-docs

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3224.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3224


          commit 0c0cb89c143546cdb53a9cd6b6c6f9f9aa26f8eb
          Author: kl0u <kkloudas@gmail.com>
          Date: 2017-01-17T15:51:09Z

          FLINK-5529 [docs] Improve / extends windowing documentation

          commit de89fa32ace13908a899523b8e880cf25e9dc45b
          Author: Fabian Hueske <fhueske@apache.org>
          Date: 2017-01-27T10:48:26Z

          FLINK-4752 [docs] Improve window assigner documentation.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3224 FLINK-5529 FLINK-4752 [docs] Improve / extends windowing documentation This PR is for both the issues in the title. It refactors/improves/extends the documentation of the windowing logic in Flink 1.2. This is the continuation of https://github.com/apache/flink/pull/3191 You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink fhu-window-docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3224.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3224 commit 0c0cb89c143546cdb53a9cd6b6c6f9f9aa26f8eb Author: kl0u <kkloudas@gmail.com> Date: 2017-01-17T15:51:09Z FLINK-5529 [docs] Improve / extends windowing documentation commit de89fa32ace13908a899523b8e880cf25e9dc45b Author: Fabian Hueske <fhueske@apache.org> Date: 2017-01-27T10:48:26Z FLINK-4752 [docs] Improve window assigner documentation.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u closed the pull request at:

          https://github.com/apache/flink/pull/3191

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/3191
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3224

          Thanks. 👍 I Pushed this both to master and release-1.2 with my changes on top.

          Could you please close this PR and the two Jira Issues?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3224 Thanks. 👍 I Pushed this both to master and release-1.2 with my changes on top. Could you please close this PR and the two Jira Issues?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u closed the pull request at:

          https://github.com/apache/flink/pull/3224

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/3224
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user kl0u commented on the issue:

          https://github.com/apache/flink/pull/3224

          Done! Thanks a lot @aljoscha

          Show
          githubbot ASF GitHub Bot added a comment - Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3224 Done! Thanks a lot @aljoscha

            People

            • Assignee:
              kkl0u Kostas Kloudas
              Reporter:
              StephanEwen Stephan Ewen
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development