Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5929

Allow Access to Per-Window State in ProcessWindowFunction

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      Right now, the state that a WindowFunction or ProcessWindowFunction can access is scoped to the key of the window but not the window itself. That is, state is global across all windows for a given key.

      For some use cases it is beneficial to keep state scoped to a window. For example, if you expect to have several Trigger firings (due to early and late firings) a user can keep state per window to keep some information between those firings.

      The per-window state has to be cleaned up in some way. For this I see two options:

      • Keep track of all state that a user uses and clean up when we reach the window GC horizon.
      • Add a method cleanup() to ProcessWindowFunction which is called when we reach the window GC horizon that users can/should use to clean up their state.

      On the API side, we can add a method windowState() on ProcessWindowFunction.Context that retrieves the per-window state and globalState() that would allow access to the (already available) global state. The Context would then look like this:

      /**
       * The context holding window metadata
       */
      public abstract class Context {
          /**
           * @return The window that is being evaluated.
           */
          public abstract W window();
      
          /**
           * State accessor for per-key and per-window state.
           */
          KeyedStateStore windowState();
      
          /**
           * State accessor for per-key global state.
           */
          KeyedStateStore globalState();
      }
      

        Issue Links

          Activity

          Hide
          rehevkor5 Shannon Carey added a comment - - edited

          Just to clarify, you're referring to the state from AbstractRichFunction#getRuntimeContext.getState(), right? That's the state keyed by the combination of event key and window operator (that's what you mean by "global"). What you're suggesting is adding state for the individual window panes, right?

          Show
          rehevkor5 Shannon Carey added a comment - - edited Just to clarify, you're referring to the state from AbstractRichFunction#getRuntimeContext.getState(), right? That's the state keyed by the combination of event key and window operator (that's what you mean by "global"). What you're suggesting is adding state for the individual window panes, right?
          Hide
          rehevkor5 Shannon Carey added a comment -

          If I understand correctly, I agree this would be useful. Currently we are working around this limitation in order to achieve communication between the Trigger (per-pane state) and the WindowFunction (per-operator state) by a hack within the WindowFunction that looks like this (we're not on 1.2 yet so we haven't looked at new ways to do this yet):

            def apply(key: String, window: TimeWindow, input, out): Unit = {
              val fireTimestampState: ValueState[java.lang.Long] =
                getRuntimeContext.getState[java.lang.Long](fireTimestampStateDescriptor)
          
              if (fireTimestampState.isInstanceOf[MemValueState[String, TimeWindow, java.lang.Long]]) {
                fireTimestampState.asInstanceOf[MemValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window)
              } else if (fireTimestampState.isInstanceOf[RocksDBValueState[String, TimeWindow, java.lang.Long]]) {
                fireTimestampState.asInstanceOf[RocksDBValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window)
              } else if (fireTimestampState.isInstanceOf[FsValueState[String, TimeWindow, java.lang.Long]]) {
                fireTimestampState.asInstanceOf[FsValueState[String, TimeWindow, java.lang.Long]].setCurrentNamespace(window)
              }
              fireTimestampState.value()
              ...
          
          Show
          rehevkor5 Shannon Carey added a comment - If I understand correctly, I agree this would be useful. Currently we are working around this limitation in order to achieve communication between the Trigger (per-pane state) and the WindowFunction (per-operator state) by a hack within the WindowFunction that looks like this (we're not on 1.2 yet so we haven't looked at new ways to do this yet): def apply(key: String , window: TimeWindow, input, out): Unit = { val fireTimestampState: ValueState[java.lang. Long ] = getRuntimeContext.getState[java.lang. Long ](fireTimestampStateDescriptor) if (fireTimestampState.isInstanceOf[MemValueState[ String , TimeWindow, java.lang. Long ]]) { fireTimestampState.asInstanceOf[MemValueState[ String , TimeWindow, java.lang. Long ]].setCurrentNamespace(window) } else if (fireTimestampState.isInstanceOf[RocksDBValueState[ String , TimeWindow, java.lang. Long ]]) { fireTimestampState.asInstanceOf[RocksDBValueState[ String , TimeWindow, java.lang. Long ]].setCurrentNamespace(window) } else if (fireTimestampState.isInstanceOf[FsValueState[ String , TimeWindow, java.lang. Long ]]) { fireTimestampState.asInstanceOf[FsValueState[ String , TimeWindow, java.lang. Long ]].setCurrentNamespace(window) } fireTimestampState.value() ...
          Hide
          sjwiesman Seth Wiesman added a comment -

          Aljoscha Krettek Yes this does look like what we were discussing. Regarding cleanup vs using gc horizon, as a user I think I would expect there to be a cleanup method to be consistent with the trigger context. I already have a rudimentary version of this implemented for my own use so I would be happy to take this ticket, clean up my code, and submit a pr.

          Show
          sjwiesman Seth Wiesman added a comment - Aljoscha Krettek Yes this does look like what we were discussing. Regarding cleanup vs using gc horizon, as a user I think I would expect there to be a cleanup method to be consistent with the trigger context. I already have a rudimentary version of this implemented for my own use so I would be happy to take this ticket, clean up my code, and submit a pr.
          Hide
          sjwiesman Seth Wiesman added a comment -

          Shannon Carey Yes, state for individual window panes.

          Show
          sjwiesman Seth Wiesman added a comment - Shannon Carey Yes, state for individual window panes.
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user uce opened a pull request:

          https://github.com/apache/flink/pull/3427

          FLINK-5929 [tests] Fix SavepointITCase instability

          When shutting down the testing cluster it could happen that checkpoint files lingered around (for checkpoints independent of the savepoint).

          This commit deactives checkpointing for the test and uses count down latches to track progress, which also reduces the test time.

          I've triggered multiple Travis builds. I will merge this if they build without the `SavepointITCase` failing.

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/uce/flink 5923-savepoint_it_case

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3427.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3427


          commit e0c46f34f55bfee743236c4042be4b2501436811
          Author: Ufuk Celebi <uce@apache.org>
          Date: 2017-02-28T10:13:28Z

          FLINK-5929 [tests] Fix SavepointITCase instability

          When shutting down the testing cluster it can happen that checkpoint
          files lingered around (checkpoints independent of the savepoint).

          This commit deactives checkpointing for the test and uses count down
          latches to track progress, which also reduces the test time.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user uce opened a pull request: https://github.com/apache/flink/pull/3427 FLINK-5929 [tests] Fix SavepointITCase instability When shutting down the testing cluster it could happen that checkpoint files lingered around (for checkpoints independent of the savepoint). This commit deactives checkpointing for the test and uses count down latches to track progress, which also reduces the test time. I've triggered multiple Travis builds. I will merge this if they build without the `SavepointITCase` failing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/flink 5923-savepoint_it_case Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3427.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3427 commit e0c46f34f55bfee743236c4042be4b2501436811 Author: Ufuk Celebi <uce@apache.org> Date: 2017-02-28T10:13:28Z FLINK-5929 [tests] Fix SavepointITCase instability When shutting down the testing cluster it can happen that checkpoint files lingered around (checkpoints independent of the savepoint). This commit deactives checkpointing for the test and uses count down latches to track progress, which also reduces the test time.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3427

          Wrong Jira Issue ID. 😅

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3427 Wrong Jira Issue ID. 😅
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Shannon Carey Yes, as Seth mentioned the global refers to "not-window-scoped" as opposed to "global for the whole job".

          Seth Wiesman I'd be very happy if you contributed your code, yes.

          I'm also in favour of the clear()/cleanup() method because Flink, so far, has erred on the side of being explicit. And, as you said, this is apparent in the way Triggers work.

          Show
          aljoscha Aljoscha Krettek added a comment - Shannon Carey Yes, as Seth mentioned the global refers to "not-window-scoped" as opposed to "global for the whole job". Seth Wiesman I'd be very happy if you contributed your code, yes. I'm also in favour of the clear() / cleanup() method because Flink, so far, has erred on the side of being explicit. And, as you said, this is apparent in the way Triggers work.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Shannon Carey Ah forgot to mention, in your workaround you could never clean up your state, right? Because of the lack of a clear() method for the WindowFunction.

          Show
          aljoscha Aljoscha Krettek added a comment - Shannon Carey Ah forgot to mention, in your workaround you could never clean up your state, right? Because of the lack of a clear() method for the WindowFunction .
          Hide
          sjwiesman Seth Wiesman added a comment -

          Aljoscha Krettek Curioius, why is ProcessWindowFunction wrapped in a WindowFunction before being passed to the WindowOperator as opposed to the other way around if ProcessWindowFunction the next gen window function?

          Show
          sjwiesman Seth Wiesman added a comment - Aljoscha Krettek Curioius, why is ProcessWindowFunction wrapped in a WindowFunction before being passed to the WindowOperator as opposed to the other way around if ProcessWindowFunction the next gen window function?
          Hide
          rehevkor5 Shannon Carey added a comment -

          Aljoscha Krettek as far as I am aware, the state does get cleared out by our Trigger. In Trigger#clear() we have: ctx.getPartitionedState(fireTimestampStateDescriptor).clear(); We could have done it in the Window Function instead, if we wanted to, given our hack.

          Show
          rehevkor5 Shannon Carey added a comment - Aljoscha Krettek as far as I am aware, the state does get cleared out by our Trigger. In Trigger#clear() we have: ctx.getPartitionedState(fireTimestampStateDescriptor).clear(); We could have done it in the Window Function instead, if we wanted to, given our hack.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Shannon Carey Ahh, I didn't see that you're accessing the state that the trigger is keeping (and cleaning up).

          Show
          aljoscha Aljoscha Krettek added a comment - Shannon Carey Ahh, I didn't see that you're accessing the state that the trigger is keeping (and cleaning up).
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Seth Wiesman AFAIK, it's wrapped in an InternalWindowFunction. It has this name for historical reasons but we can easily change the name and the method signature because it's not public API.

          Does this help?

          Show
          aljoscha Aljoscha Krettek added a comment - Seth Wiesman AFAIK, it's wrapped in an InternalWindowFunction . It has this name for historical reasons but we can easily change the name and the method signature because it's not public API. Does this help?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Seth Wiesman One more thing: we can only allow access to per-window state for non-merging windows, for now. For merging windows, the window function would need a method for merging state, similar to Trigger.merge() and I don't want to give that to users just yet because it might seem a bit overwhelming. What do you think?

          Show
          aljoscha Aljoscha Krettek added a comment - Seth Wiesman One more thing: we can only allow access to per-window state for non-merging windows, for now. For merging windows, the window function would need a method for merging state, similar to Trigger.merge() and I don't want to give that to users just yet because it might seem a bit overwhelming. What do you think?
          Hide
          sjwiesman Seth Wiesman added a comment - - edited

          Aljoscha Krettek That seems like a reasonable first step. For now would it be acceptable to through an exception if they try to access state in a merging window? Or should it return a KeyedStateStore that throws if you try to use it.

          Show
          sjwiesman Seth Wiesman added a comment - - edited Aljoscha Krettek That seems like a reasonable first step. For now would it be acceptable to through an exception if they try to access state in a merging window? Or should it return a KeyedStateStore that throws if you try to use it.
          Hide
          sjwiesman Seth Wiesman added a comment -

          Aljoscha Krettek I have a version of this that I feel pretty good, I'm working on my test coverage but I am not sure where the appropriate place to put the tests for this would be. I have some in the WindowOperatorTest and am working on some IntergrationTests. What else do you think makes sense for this type of feature?

          Show
          sjwiesman Seth Wiesman added a comment - Aljoscha Krettek I have a version of this that I feel pretty good, I'm working on my test coverage but I am not sure where the appropriate place to put the tests for this would be. I have some in the WindowOperatorTest and am working on some IntergrationTests. What else do you think makes sense for this type of feature?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Seth Wiesman The best place to put new tests for the WindowOperator is WindowOperatorContractTest, this has low-level tests for specific behaviours of the WindowOperator. Ideally, we would not have WindowOperator test anymore because it is a more crude way of testing. It's simply throwing things at the operator and checking whether the result is correct whereas the new tests in the contract test are more specific.

          Show
          aljoscha Aljoscha Krettek added a comment - Seth Wiesman The best place to put new tests for the WindowOperator is WindowOperatorContractTest , this has low-level tests for specific behaviours of the WindowOperator . Ideally, we would not have WindowOperator test anymore because it is a more crude way of testing. It's simply throwing things at the operator and checking whether the result is correct whereas the new tests in the contract test are more specific.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Sorry, overlooked the other comment. Yes, I think throwing an Exception is good for now. For the tests, I think we're good if we have solid tests in WindowOperatorContractTest.

          Show
          aljoscha Aljoscha Krettek added a comment - Sorry, overlooked the other comment. Yes, I think throwing an Exception is good for now. For the tests, I think we're good if we have solid tests in WindowOperatorContractTest .
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user sjwiesman opened a pull request:

          https://github.com/apache/flink/pull/3479

          FLINK-5929 Allow Access to Per-Window State in ProcessWindowFunction

          Right now, the state that a WindowFunction or ProcessWindowFunction can
          access is scoped to the key of the window but not the window itself.
          That is, state is global across all windows for a given key.
          For some use cases it is beneficial to keep state scoped to a window.
          For example, if you expect to have several Trigger firings (due to early
          and late firings) a user can keep state per window to keep some
          information between those firings.

          @aljoscha

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/sjwiesman/flink FLINK-5929

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3479.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3479


          commit 623edd1fb107e8dd0aae755a7b252df1f91713bd
          Author: Seth Wiesman <swiesman@mediamath.com>
          Date: 2017-03-06T04:07:18Z

          FLINK-5929 Allow Access to Per-Window State in ProcessWindowFunction

          Right now, the state that a WindowFunction or ProcessWindowFunction can
          access is scoped to the key of the window but not the window itself.
          That is, state is global across all windows for a given key.
          For some use cases it is beneficial to keep state scoped to a window.
          For example, if you expect to have several Trigger firings (due to early
          and late firings) a user can keep state per window to keep some
          information between those firings.


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user sjwiesman opened a pull request: https://github.com/apache/flink/pull/3479 FLINK-5929 Allow Access to Per-Window State in ProcessWindowFunction Right now, the state that a WindowFunction or ProcessWindowFunction can access is scoped to the key of the window but not the window itself. That is, state is global across all windows for a given key. For some use cases it is beneficial to keep state scoped to a window. For example, if you expect to have several Trigger firings (due to early and late firings) a user can keep state per window to keep some information between those firings. @aljoscha You can merge this pull request into a Git repository by running: $ git pull https://github.com/sjwiesman/flink FLINK-5929 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3479 commit 623edd1fb107e8dd0aae755a7b252df1f91713bd Author: Seth Wiesman <swiesman@mediamath.com> Date: 2017-03-06T04:07:18Z FLINK-5929 Allow Access to Per-Window State in ProcessWindowFunction Right now, the state that a WindowFunction or ProcessWindowFunction can access is scoped to the key of the window but not the window itself. That is, state is global across all windows for a given key. For some use cases it is beneficial to keep state scoped to a window. For example, if you expect to have several Trigger firings (due to early and late firings) a user can keep state per window to keep some information between those firings.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3479

          Thanks @sjwiesman! I'll have a look.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Thanks @sjwiesman! I'll have a look.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3479

          One quick initial remark: instead of each time having an anonymous inner class for the `Context` you can create a reusable class for that like this:
          ```
          class InternalProcessWindowContext<IN, OUT, KEY, W extends Window>
          extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {

          W window;
          InternalWindowFunction.InternalWindowContext internalContext;

          InternalProcessWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function)

          { function.super(); }

          @Override
          public W window()

          { return window; }

          @Override
          public KeyedStateStore windowState()

          { return internalContext.windowState(); }

          @Override
          public KeyedStateStore globalState()

          { return internalContext.globalState(); }

          }
          ```

          The `function.super()` call in there makes it work even though `Context` is itself defined as an inner abstract class of `ProcessWindowFunction`. It's a bit of black magic and not really too well known, I think. 😉

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 One quick initial remark: instead of each time having an anonymous inner class for the `Context` you can create a reusable class for that like this: ``` class InternalProcessWindowContext<IN, OUT, KEY, W extends Window> extends ProcessWindowFunction<IN, OUT, KEY, W>.Context { W window; InternalWindowFunction.InternalWindowContext internalContext; InternalProcessWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) { function.super(); } @Override public W window() { return window; } @Override public KeyedStateStore windowState() { return internalContext.windowState(); } @Override public KeyedStateStore globalState() { return internalContext.globalState(); } } ``` The `function.super()` call in there makes it work even though `Context` is itself defined as an inner abstract class of `ProcessWindowFunction`. It's a bit of black magic and not really too well known, I think. 😉
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r104432699

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
          @@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long time) {
          }

          /**
          + * For now keyed state is not allowed in ProcessWindowFunctions
          + */
          + public class MergingKeyStore implements KeyedStateStore {
          +
          + protected W window;
          +
          + @Override
          + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties)

          { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }
          +
          + @Override
          + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }

          +
          + @Override
          + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties)

          { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }
          +
          + @Override
          + public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }

          +
          + @Override
          + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)

          { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }

          + }
          +
          + public class WindowPaneKeyStore implements KeyedStateStore {
          +
          + protected W window;
          +
          + @Override
          + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
          + try

          { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + }
          + }
          +
          + @Override
          + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
          + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + }

          catch (Exception e)

          { + throw new RuntimeException("Could not retrieve state", e); + }
          + }
          +
          + @Override
          + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
          + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + }

          + }
          +
          + @Override
          + public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
          + try

          { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + }
          + }
          +
          + @Override
          + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
          + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + }

          catch (Exception e)

          { + throw new RuntimeException("Could not retrieve state", e); + }

          + }
          + }
          +
          + /**
          + *

          {@code WindowContext} is a utility for handling {@code ProcessWindowFunction} invocations. It can be reused
          + * by setting the {@code key} and {@code window} fields. No internal state must be kept in
          + * the {@code WindowContext}

          + */
          + public class WindowContext implements InternalWindowFunction.InternalWindowContext {
          + protected W window;
          +
          + protected WindowPaneKeyStore windowPaneKeyStore;
          + protected MergingKeyStore mergingKeyStore;
          +
          + public WindowContext(W window)

          { + this.window = window; + this.windowPaneKeyStore = new WindowPaneKeyStore(); + this.mergingKeyStore = new MergingKeyStore(); + }

          +
          + @Override
          + public String toString() {
          + return "WindowContext

          {Window = " + window.toString() + "}

          ";
          + }
          +
          + public void clear() throws Exception

          { + userFunction.clear(window, this); + }

          +
          + @Override
          + public KeyedStateStore windowState() {
          + if (windowAssigner instanceof MergingWindowAssigner) {
          — End diff –

          Instead of checking every time you could initialise the `WindowContext` with either a `WindowStateStore` or the (exception throwing) `MergingStateStore` at the beginning.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104432699 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long time) { } /** + * For now keyed state is not allowed in ProcessWindowFunctions + */ + public class MergingKeyStore implements KeyedStateStore { + + protected W window; + + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + } + + public class WindowPaneKeyStore implements KeyedStateStore { + + protected W window; + + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + } + + /** + * {@code WindowContext} is a utility for handling {@code ProcessWindowFunction} invocations. It can be reused + * by setting the {@code key} and {@code window} fields. No internal state must be kept in + * the {@code WindowContext} + */ + public class WindowContext implements InternalWindowFunction.InternalWindowContext { + protected W window; + + protected WindowPaneKeyStore windowPaneKeyStore; + protected MergingKeyStore mergingKeyStore; + + public WindowContext(W window) { + this.window = window; + this.windowPaneKeyStore = new WindowPaneKeyStore(); + this.mergingKeyStore = new MergingKeyStore(); + } + + @Override + public String toString() { + return "WindowContext {Window = " + window.toString() + "} "; + } + + public void clear() throws Exception { + userFunction.clear(window, this); + } + + @Override + public KeyedStateStore windowState() { + if (windowAssigner instanceof MergingWindowAssigner) { — End diff – Instead of checking every time you could initialise the `WindowContext` with either a `WindowStateStore` or the (exception throwing) `MergingStateStore` at the beginning.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r104432787

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
          @@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long time) {
          }

          /**
          + * For now keyed state is not allowed in ProcessWindowFunctions
          + */
          + public class MergingKeyStore implements KeyedStateStore {
          +
          + protected W window;
          +
          + @Override
          + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties)

          { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }
          +
          + @Override
          + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }

          +
          + @Override
          + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties)

          { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }
          +
          + @Override
          + public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }

          +
          + @Override
          + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)

          { + throw new RuntimeException("keyedState is not allowed in merging windows"); + }

          + }
          +
          + public class WindowPaneKeyStore implements KeyedStateStore {
          +
          + protected W window;
          +
          + @Override
          + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
          + try

          { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + }
          + }
          +
          + @Override
          + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
          + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + }

          catch (Exception e)

          { + throw new RuntimeException("Could not retrieve state", e); + }
          + }
          +
          + @Override
          + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
          + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + }

          + }
          +
          + @Override
          + public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
          + try

          { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + }
          + }
          +
          + @Override
          + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
          + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + }

          catch (Exception e)

          { + throw new RuntimeException("Could not retrieve state", e); + }

          + }
          + }
          +
          + /**
          + *

          {@code WindowContext} is a utility for handling {@code ProcessWindowFunction} invocations. It can be reused
          + * by setting the {@code key} and {@code window} fields. No internal state must be kept in
          + * the {@code WindowContext}

          + */
          + public class WindowContext implements InternalWindowFunction.InternalWindowContext {
          + protected W window;
          +
          + protected WindowPaneKeyStore windowPaneKeyStore;
          + protected MergingKeyStore mergingKeyStore;
          +
          + public WindowContext(W window)

          { + this.window = window; + this.windowPaneKeyStore = new WindowPaneKeyStore(); + this.mergingKeyStore = new MergingKeyStore(); + }

          +
          + @Override
          + public String toString() {
          + return "WindowContext

          {Window = " + window.toString() + "}

          ";
          + }
          +
          + public void clear() throws Exception

          { + userFunction.clear(window, this); + }

          +
          + @Override
          + public KeyedStateStore windowState() {
          + if (windowAssigner instanceof MergingWindowAssigner)

          { + return mergingKeyStore; + }

          else

          { + this.windowPaneKeyStore.window = window; + return this.windowPaneKeyStore; + }

          + }
          +
          + @Override
          + public KeyedStateStore globalState() {
          + if (windowAssigner instanceof MergingWindowAssigner) {
          — End diff –

          I think access to global state is fine for merging windows.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104432787 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -629,6 +645,135 @@ protected final boolean isCleanupTime(W window, long time) { } /** + * For now keyed state is not allowed in ProcessWindowFunctions + */ + public class MergingKeyStore implements KeyedStateStore { + + protected W window; + + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + throw new RuntimeException("keyedState is not allowed in merging windows"); + } + } + + public class WindowPaneKeyStore implements KeyedStateStore { + + protected W window; + + @Override + public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + + @Override + public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) { + try { + return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties); + } catch (Exception e) { + throw new RuntimeException("Could not retrieve state", e); + } + } + } + + /** + * {@code WindowContext} is a utility for handling {@code ProcessWindowFunction} invocations. It can be reused + * by setting the {@code key} and {@code window} fields. No internal state must be kept in + * the {@code WindowContext} + */ + public class WindowContext implements InternalWindowFunction.InternalWindowContext { + protected W window; + + protected WindowPaneKeyStore windowPaneKeyStore; + protected MergingKeyStore mergingKeyStore; + + public WindowContext(W window) { + this.window = window; + this.windowPaneKeyStore = new WindowPaneKeyStore(); + this.mergingKeyStore = new MergingKeyStore(); + } + + @Override + public String toString() { + return "WindowContext {Window = " + window.toString() + "} "; + } + + public void clear() throws Exception { + userFunction.clear(window, this); + } + + @Override + public KeyedStateStore windowState() { + if (windowAssigner instanceof MergingWindowAssigner) { + return mergingKeyStore; + } else { + this.windowPaneKeyStore.window = window; + return this.windowPaneKeyStore; + } + } + + @Override + public KeyedStateStore globalState() { + if (windowAssigner instanceof MergingWindowAssigner) { — End diff – I think access to global state is fine for merging windows.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sjwiesman commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r104458722

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java —
          @@ -39,5 +40,31 @@

          • @param out A collector for emitting elements.
          • @throws Exception The function may throw exceptions to fail the program and trigger recovery.
            */
            + @Deprecated
            void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
              • End diff –

          @aljoscha I meant to ask, should I leave this method or remove it?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sjwiesman commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104458722 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java — @@ -39,5 +40,31 @@ @param out A collector for emitting elements. @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; End diff – @aljoscha I meant to ask, should I leave this method or remove it?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r104470521

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java —
          @@ -39,5 +40,31 @@

          • @param out A collector for emitting elements.
          • @throws Exception The function may throw exceptions to fail the program and trigger recovery.
            */
            + @Deprecated
            void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
              • End diff –

          Ah, I meant to actually write that earlier. Yes: please remove. 😅

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104470521 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java — @@ -39,5 +40,31 @@ @param out A collector for emitting elements. @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; End diff – Ah, I meant to actually write that earlier. Yes: please remove. 😅
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sjwiesman commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r104492162

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java —
          @@ -39,5 +40,31 @@

          • @param out A collector for emitting elements.
          • @throws Exception The function may throw exceptions to fail the program and trigger recovery.
            */
            + @Deprecated
            void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
              • End diff –

          I noticed an issue when removing apply. The method is used inside of AccumulatingKeyedTimePanes which takes in an AbstractStreamOperator as an argument to its evaluateWindow method. When creating the context I can get the global keyed state backend from the operator, but not the partitioned state because those methods are protected. Now the only two uses of this class are its subclasses which have both been deprecated. My question is, do you think I should modify the evaluateWindow method to accept a keyed state store which wraps the operator partitioned state or just throw an exception on context.windowState() because all valid uses of this method have been deprecated?

          Show
          githubbot ASF GitHub Bot added a comment - Github user sjwiesman commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104492162 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java — @@ -39,5 +40,31 @@ @param out A collector for emitting elements. @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; End diff – I noticed an issue when removing apply. The method is used inside of AccumulatingKeyedTimePanes which takes in an AbstractStreamOperator as an argument to its evaluateWindow method. When creating the context I can get the global keyed state backend from the operator, but not the partitioned state because those methods are protected. Now the only two uses of this class are its subclasses which have both been deprecated. My question is, do you think I should modify the evaluateWindow method to accept a keyed state store which wraps the operator partitioned state or just throw an exception on context.windowState() because all valid uses of this method have been deprecated?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r104504635

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java —
          @@ -39,5 +40,31 @@

          • @param out A collector for emitting elements.
          • @throws Exception The function may throw exceptions to fail the program and trigger recovery.
            */
            + @Deprecated
            void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
              • End diff –

          I think for now it's OK to throw an Exception here.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r104504635 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java — @@ -39,5 +40,31 @@ @param out A collector for emitting elements. @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ + @Deprecated void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception; End diff – I think for now it's OK to throw an Exception here.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sjwiesman commented on the issue:

          https://github.com/apache/flink/pull/3479

          @aljoscha I made the changes you asked for. Just a heads up, there are a number of files that were superficially changed when migrating from apply -> process but are otherwise untouched.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 @aljoscha I made the changes you asked for. Just a heads up, there are a number of files that were superficially changed when migrating from apply -> process but are otherwise untouched.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r105660519

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
          @@ -153,6 +161,8 @@

          protected transient Context context = new Context(null, null);

          + protected transient WindowContext windowContext = new WindowContext(null);
          — End diff –

          I think to make it more clear what they do we should rename these two contexts to `triggerContext` and `processContext`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105660519 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -153,6 +161,8 @@ protected transient Context context = new Context(null, null); + protected transient WindowContext windowContext = new WindowContext(null); — End diff – I think to make it more clear what they do we should rename these two contexts to `triggerContext` and `processContext`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r105662139

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java —
          @@ -92,12 +93,45 @@ public void process(K key, final Context context, Iterable<T> values, Collector<
          result = foldFunction.fold(result, val);
          }

          • windowFunction.process(key, windowFunction.new Context() {
            + ProcessWindowFunction<ACC, R, K, W>.Context ctx = windowFunction.new Context() {
              • End diff –

          This can benefit from a similar refactoring as the internal window functions, i.e. creating an internal context class instead of the anonymous inner classes.

          This also holds for `FoldApplyProcessAllWindowFunction`, `ReduceApplyProcessAllWindowFunction` and `ReduceApplyProcessWindowFunction`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105662139 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java — @@ -92,12 +93,45 @@ public void process(K key, final Context context, Iterable<T> values, Collector< result = foldFunction.fold(result, val); } windowFunction.process(key, windowFunction.new Context() { + ProcessWindowFunction<ACC, R, K, W>.Context ctx = windowFunction.new Context() { End diff – This can benefit from a similar refactoring as the internal window functions, i.e. creating an internal context class instead of the anonymous inner classes. This also holds for `FoldApplyProcessAllWindowFunction`, `ReduceApplyProcessAllWindowFunction` and `ReduceApplyProcessWindowFunction`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r105661210

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
          @@ -628,6 +644,123 @@ protected final boolean isCleanupTime(W window, long time)

          { return time == cleanupTime(window); }

          + public abstract class KeyedStateStoreWithWindow implements KeyedStateStore {
          — End diff –

          Maybe comment that we have a base class where we can set the window so that we can once create a `MergingKeyStore` or `WindowPaneKeyStore` (depending on the window assigner) and then not care about the distinction anymore.

          I remember I suggested this but now struggled to see why there is the base class. 😉

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105661210 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -628,6 +644,123 @@ protected final boolean isCleanupTime(W window, long time) { return time == cleanupTime(window); } + public abstract class KeyedStateStoreWithWindow implements KeyedStateStore { — End diff – Maybe comment that we have a base class where we can set the window so that we can once create a `MergingKeyStore` or `WindowPaneKeyStore` (depending on the window assigner) and then not care about the distinction anymore. I remember I suggested this but now struggled to see why there is the base class. 😉
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r105661470

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java —
          @@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction(
          }

          @Override

          • public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
            + public void open(Configuration parameters) throws Exception {
            + super.open(parameters);
            ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
          • ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() {
          • @Override
          • public W window() { - return window; - }
          • };
            + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
            + }

          + @Override
          + public void process(Byte aByte, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
          final ACC acc = aggFunction.createAccumulator();

          for (T val : input)

          { aggFunction.add(val, acc); }
          • wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out);
            + this.ctx.window = window;
            + this.ctx.internalContext = context;
            + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
            + wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
            + }
            +
            + @Override
            + public void clear(final W window, final InternalWindowContext context) throws Exception {
            + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
            + final ProcessAllWindowFunction<V, R, W>.Context ctx = wrappedFunction.new Context() {
              • End diff –

          leftover anonymous inner `Context`. This should also use `this.ctx` like in `process()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105661470 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java — @@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction( } @Override public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception { + public void open(Configuration parameters) throws Exception { + super.open(parameters); ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() { @Override public W window() { - return window; - } }; + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction); + } + @Override + public void process(Byte aByte, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception { final ACC acc = aggFunction.createAccumulator(); for (T val : input) { aggFunction.add(val, acc); } wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out); + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out); + } + + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; + final ProcessAllWindowFunction<V, R, W>.Context ctx = wrappedFunction.new Context() { End diff – leftover anonymous inner `Context`. This should also use `this.ctx` like in `process()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sjwiesman commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3479#discussion_r105763162

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java —
          @@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction(
          }

          @Override

          • public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
            + public void open(Configuration parameters) throws Exception {
            + super.open(parameters);
            ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
          • ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() {
          • @Override
          • public W window() { - return window; - }
          • };
            + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
            + }

          + @Override
          + public void process(Byte aByte, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
          final ACC acc = aggFunction.createAccumulator();

          for (T val : input)

          { aggFunction.add(val, acc); }
          • wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out);
            + this.ctx.window = window;
            + this.ctx.internalContext = context;
            + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
            + wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
            + }
            +
            + @Override
            + public void clear(final W window, final InternalWindowContext context) throws Exception {
            + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
            + final ProcessAllWindowFunction<V, R, W>.Context ctx = wrappedFunction.new Context() {
              • End diff –

          whoops 😱

          Show
          githubbot ASF GitHub Bot added a comment - Github user sjwiesman commented on a diff in the pull request: https://github.com/apache/flink/pull/3479#discussion_r105763162 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java — @@ -53,22 +57,47 @@ public InternalAggregateProcessAllWindowFunction( } @Override public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception { + public void open(Configuration parameters) throws Exception { + super.open(parameters); ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() { @Override public W window() { - return window; - } }; + this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction); + } + @Override + public void process(Byte aByte, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception { final ACC acc = aggFunction.createAccumulator(); for (T val : input) { aggFunction.add(val, acc); } wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out); + this.ctx.window = window; + this.ctx.internalContext = context; + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; + wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out); + } + + @Override + public void clear(final W window, final InternalWindowContext context) throws Exception { + ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction; + final ProcessAllWindowFunction<V, R, W>.Context ctx = wrappedFunction.new Context() { End diff – whoops 😱
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sjwiesman commented on the issue:

          https://github.com/apache/flink/pull/3479

          sorry for the delay, things got crazy at work. let me know if there are any issues.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 sorry for the delay, things got crazy at work. let me know if there are any issues.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3479

          Don't worry. 😃 Is it ready for another review pass now?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Don't worry. 😃 Is it ready for another review pass now?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sjwiesman commented on the issue:

          https://github.com/apache/flink/pull/3479

          It looks like when I rebased on master I broke one of the scala side outputs test. I'm going to push a fix right now but it won't change any of the code surrounding this pr.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 It looks like when I rebased on master I broke one of the scala side outputs test. I'm going to push a fix right now but it won't change any of the code surrounding this pr.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3479

          Ok, please ping me when you pushed the fix. 😃

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Ok, please ping me when you pushed the fix. 😃
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sjwiesman commented on the issue:

          https://github.com/apache/flink/pull/3479

          Pushed the fix, I had to update SideOutputsITCase so the ProcessAllWindowFunctions had a noop clear method. All tests passed locally, take a look

          Show
          githubbot ASF GitHub Bot added a comment - Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 Pushed the fix, I had to update SideOutputsITCase so the ProcessAllWindowFunctions had a noop clear method. All tests passed locally, take a look
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3479

          Thanks!

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Thanks!
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3479

          Thanks for implementing this! 😃

          I just merged, could you please close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Thanks for implementing this! 😃 I just merged, could you please close this PR?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented on master in
          fad201bfb0b1f2757f68f7b3ffaf97a486eb93e8

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented on master in fad201bfb0b1f2757f68f7b3ffaf97a486eb93e8
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Seth Wiesman I created FLINK-6163 and FLINK-6164 as follow-up issues. Just letting you know in case you're interested.

          Show
          aljoscha Aljoscha Krettek added a comment - Seth Wiesman I created FLINK-6163 and FLINK-6164 as follow-up issues. Just letting you know in case you're interested.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sjwiesman commented on the issue:

          https://github.com/apache/flink/pull/3479

          Done! Thank you for for helping me get this feature merged in. This has to be one of the most painless commits I've ever made to an open source project of this size.

          Show
          githubbot ASF GitHub Bot added a comment - Github user sjwiesman commented on the issue: https://github.com/apache/flink/pull/3479 Done! Thank you for for helping me get this feature merged in. This has to be one of the most painless commits I've ever made to an open source project of this size.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sjwiesman closed the pull request at:

          https://github.com/apache/flink/pull/3479

          Show
          githubbot ASF GitHub Bot added a comment - Github user sjwiesman closed the pull request at: https://github.com/apache/flink/pull/3479
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3479

          Hehe, thanks! 😄

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3479 Hehe, thanks! 😄

            People

            • Assignee:
              sjwiesman Seth Wiesman
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development