Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4953

Allow access to "time" in ProcessWindowFunction.Context

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 1.3.0
    • Component/s: DataStream API
    • Labels:
      None

      Description

      The recently added ProcessWindowFunction has a Context object that allows querying some additional information about the window firing that we are processing. Right now, this is only the window for which the firing is happening. We should extends this with methods that allow querying the current processing time and the current watermark.

      Original text by issue creator: This is similar to FLINK-3674 but exposing time information in window functions. Currently when a timer is fired, all states in a window will be returned to users, including those after the timer. This change will allow users to filter out states after the timer based on time info.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang closed the pull request at:

          https://github.com/apache/flink/pull/3661

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang closed the pull request at: https://github.com/apache/flink/pull/3661
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3661

          Merged. Thanks again for working on this! 😄

          Could you please close this PR?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3661 Merged. Thanks again for working on this! 😄 Could you please close this PR?
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Implemented in db759c5303bef8b412a183b4bcdf6d11abb7f327

          Show
          aljoscha Aljoscha Krettek added a comment - Implemented in db759c5303bef8b412a183b4bcdf6d11abb7f327
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3661

          Thanks! 😃

          I'll let travis run and then I'm merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3661 Thanks! 😃 I'll let travis run and then I'm merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on the issue:

          https://github.com/apache/flink/pull/3661

          By the way, great conference and great contents from http://sf.flink-forward.org/

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/3661 By the way, great conference and great contents from http://sf.flink-forward.org/
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on the issue:

          https://github.com/apache/flink/pull/3661

          @aljoscha Yeah, that looks better. Thanks.

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/3661 @aljoscha Yeah, that looks better. Thanks.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3661

          Thanks, @manuzhang I had another look at your changes. I would merge them now but simplify the tests and `WindowOperator` a little, if that's alright with you.

          In `WindowOperator` I would change `WindowContext` to query the `InternalTimerService` directly, as in:
          ```
          @Override
          public long currentProcessingTime()

          { return internalTimerService.currentProcessingTime(); }

          @Override
          public long currentWatermark()

          { return internalTimerService.currentWatermark(); }

          ```
          I would introduce a specific test, like this:
          ```
          @Test
          public void testEventTimeQuerying() throws Exception

          { testCurrentTimeQuerying(new EventTimeAdaptor()); }

          @Test
          public void testProcessingTimeQuerying() throws Exception

          { testCurrentTimeQuerying(new ProcessingTimeAdaptor()); }

          public void testCurrentTimeQuerying(final TimeDomainAdaptor timeAdaptor) throws Exception {
          WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
          timeAdaptor.setIsEventTime(mockAssigner);
          Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
          InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();

          final KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
          createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);

          testHarness.open();

          shouldFireOnElement(mockTrigger);

          when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
          .thenReturn(Arrays.asList(new TimeWindow(0, 20)));

          doAnswer(new Answer<Object>() {
          @Override
          public Object answer(InvocationOnMock invocationOnMock) throws Throwable

          { InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2]; timeAdaptor.verifyCorrectTime(testHarness, context); return null; }

          }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());

          doAnswer(new Answer<Object>() {
          @Override
          public Object answer(InvocationOnMock invocationOnMock) throws Throwable

          { InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1]; timeAdaptor.verifyCorrectTime(testHarness, context); return null; }

          }).when(mockWindowFunction).clear(anyTimeWindow(), anyInternalWindowContext());

          timeAdaptor.advanceTime(testHarness, 10);

          testHarness.processElement(new StreamRecord<>(0, 0L));

          verify(mockWindowFunction, times(1)).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());

          timeAdaptor.advanceTime(testHarness, 100);

          verify(mockWindowFunction, times(1)).clear(anyTimeWindow(), anyInternalWindowContext());
          }
          ```

          What do you think? I would change your commit and commit as one thing.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3661 Thanks, @manuzhang I had another look at your changes. I would merge them now but simplify the tests and `WindowOperator` a little, if that's alright with you. In `WindowOperator` I would change `WindowContext` to query the `InternalTimerService` directly, as in: ``` @Override public long currentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public long currentWatermark() { return internalTimerService.currentWatermark(); } ``` I would introduce a specific test, like this: ``` @Test public void testEventTimeQuerying() throws Exception { testCurrentTimeQuerying(new EventTimeAdaptor()); } @Test public void testProcessingTimeQuerying() throws Exception { testCurrentTimeQuerying(new ProcessingTimeAdaptor()); } public void testCurrentTimeQuerying(final TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); timeAdaptor.setIsEventTime(mockAssigner); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); final KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); shouldFireOnElement(mockTrigger); when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) .thenReturn(Arrays.asList(new TimeWindow(0, 20))); doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2]; timeAdaptor.verifyCorrectTime(testHarness, context); return null; } }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); doAnswer(new Answer<Object>() { @Override public Object answer(InvocationOnMock invocationOnMock) throws Throwable { InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1]; timeAdaptor.verifyCorrectTime(testHarness, context); return null; } }).when(mockWindowFunction).clear(anyTimeWindow(), anyInternalWindowContext()); timeAdaptor.advanceTime(testHarness, 10); testHarness.processElement(new StreamRecord<>(0, 0L)); verify(mockWindowFunction, times(1)).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector()); timeAdaptor.advanceTime(testHarness, 100); verify(mockWindowFunction, times(1)).clear(anyTimeWindow(), anyInternalWindowContext()); } ``` What do you think? I would change your commit and commit as one thing.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on the issue:

          https://github.com/apache/flink/pull/3661

          @aljoscha Updated as you proposed although not sure I've got all the needed tests there (properly).

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/3661 @aljoscha Updated as you proposed although not sure I've got all the needed tests there (properly).
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3661

          I'm currently in SF for http://sf.flink-forward.org and quite busy. That's why my responses are quite slow, sorry for that.

          A quick answer: There is the (slightly hard to find) `InternalWindowFunctionTest` which tests that the internal window functions correctly forward calls to a process window function and other (user facing) window functions. We should test the internal logic of the new time access code for `InternalWindowFunction` in `WindowOperatorContractTest`, (see, for example `testPerWindowStateSetAndClearedOnEventTimePurge()`). We should add more tests that verify that per-window state and also the new time access is correctly forwarded in `InternalWindowFunctionTest`.

          What do you think?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3661 I'm currently in SF for http://sf.flink-forward.org and quite busy. That's why my responses are quite slow, sorry for that. A quick answer: There is the (slightly hard to find) `InternalWindowFunctionTest` which tests that the internal window functions correctly forward calls to a process window function and other (user facing) window functions. We should test the internal logic of the new time access code for `InternalWindowFunction` in `WindowOperatorContractTest`, (see, for example `testPerWindowStateSetAndClearedOnEventTimePurge()`). We should add more tests that verify that per-window state and also the new time access is correctly forwarded in `InternalWindowFunctionTest`. What do you think?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on the issue:

          https://github.com/apache/flink/pull/3661

          It seems `WindowOperatorContractTest` is for `InternalWindowFunction` while there is no existing tests for `ProcessWindowFunction`. Should I add them there ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/3661 It seems `WindowOperatorContractTest` is for `InternalWindowFunction` while there is no existing tests for `ProcessWindowFunction`. Should I add them there ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3661#discussion_r109821341

          — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java —
          @@ -2560,7 +2560,7 @@ public int compare(Object o1, Object o2) {
          if (comparison != 0)

          { return comparison; }
          • return (int) (sr0.getValue().f1 - sr1.getValue().f1);
            + return (int) (sr0.getValue().f2 - sr1.getValue().f2);
              • End diff –

          @aljoscha this line compares second field again rather than the third field.

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on a diff in the pull request: https://github.com/apache/flink/pull/3661#discussion_r109821341 — Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java — @@ -2560,7 +2560,7 @@ public int compare(Object o1, Object o2) { if (comparison != 0) { return comparison; } return (int) (sr0.getValue().f1 - sr1.getValue().f1); + return (int) (sr0.getValue().f2 - sr1.getValue().f2); End diff – @aljoscha this line compares second field again rather than the third field.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3661

          I'd like to keep it for now, since it might help catch some bugs and doesn't take long to run.

          What was the bug in the comparator?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3661 I'd like to keep it for now, since it might help catch some bugs and doesn't take long to run. What was the bug in the comparator?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on the issue:

          https://github.com/apache/flink/pull/3661

          Okay. Shall we open a JIRA to deprecate `WindowOperatorTest` ? I found a minor bug in `WindowOperatorTest#Tuple3ResultSortComparator` (same as `WindowOperatorMigrationTest`), do we want to fix it ?

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/3661 Okay. Shall we open a JIRA to deprecate `WindowOperatorTest` ? I found a minor bug in `WindowOperatorTest#Tuple3ResultSortComparator` (same as `WindowOperatorMigrationTest`), do we want to fix it ?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3661

          Thanks for opening this PR. 😃

          I had a few comments and I think it's easier to collect them here instead of commenting on the individual files:

          • I would prefer to not call `context.getCurrent*Time()` overtime even though it's not needed. This can be costly. Instead we can pass the `Context` and only invoke the methods when needed.
          • I regard `WindowOperatorTest` as legacy, new tests should be added to `WindowOperatorContractTest`. This is a base class with a lot of test cases that is extended by `RegularWindowOperatorTest` and `EvictingWindowOperatorTest`.

          Both of these are hard to know and it's not obvious what test should go where, sorry for the inconvenience.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3661 Thanks for opening this PR. 😃 I had a few comments and I think it's easier to collect them here instead of commenting on the individual files: I would prefer to not call `context.getCurrent*Time()` overtime even though it's not needed. This can be costly. Instead we can pass the `Context` and only invoke the methods when needed. I regard `WindowOperatorTest` as legacy, new tests should be added to `WindowOperatorContractTest`. This is a base class with a lot of test cases that is extended by `RegularWindowOperatorTest` and `EvictingWindowOperatorTest`. Both of these are hard to know and it's not obvious what test should go where, sorry for the inconvenience.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on the issue:

          https://github.com/apache/flink/pull/3661

          @aljoscha

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/3661 @aljoscha
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on the issue:

          https://github.com/apache/flink/pull/3661

          Not sure what happened here although tests passed in most cases. Looks unrelated to this PR.

          ```
          [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (default-test) on project flink-runtime-web_2.10: There are test failures.
          [ERROR]
          [ERROR] Please refer to /home/travis/build/apache/flink/flink-runtime-web/target/surefire-reports for the individual test results.
          [ERROR] -> [Help 1]
          [ERROR]
          [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
          [ERROR] Re-run Maven using the -X switch to enable full debug logging.
          [ERROR]
          [ERROR] For more information about the errors and possible solutions, please read the following articles:
          [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
          [ERROR]
          [ERROR] After correcting the problems, you can resume the build with the command
          [ERROR] mvn <goals> -rf :flink-runtime-web_2.10
          Trying to KILL watchdog (1513).
          ./tools/travis_mvn_watchdog.sh: line 210: 1513 Terminated watchdog
          MVN exited with EXIT CODE: 1.
          java.io.FileNotFoundException: build-target/lib/flink-dist-*.jar (No such file or directory)
          at java.util.zip.ZipFile.open(Native Method)
          at java.util.zip.ZipFile.<init>(ZipFile.java:215)
          at java.util.zip.ZipFile.<init>(ZipFile.java:145)
          at java.util.zip.ZipFile.<init>(ZipFile.java:116)
          at sun.tools.jar.Main.list(Main.java:1004)
          at sun.tools.jar.Main.run(Main.java:245)
          at sun.tools.jar.Main.main(Main.java:1177)
          find: `./flink-yarn-tests/target/flink-yarn-tests*': No such file or directory
          PRODUCED build artifacts.
          1.log 2.log build_info mvn.out
          COMPRESSING build artifacts.
          1.log
          2.log
          build_info
          mvn.out
          Uploading to transfer.sh
          The command "./tools/travis_mvn_watchdog.sh 300" exited with 1.
          Done. Your build exited with 1.
          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/3661 Not sure what happened here although tests passed in most cases. Looks unrelated to this PR. ``` [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (default-test) on project flink-runtime-web_2.10: There are test failures. [ERROR] [ERROR] Please refer to /home/travis/build/apache/flink/flink-runtime-web/target/surefire-reports for the individual test results. [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn <goals> -rf :flink-runtime-web_2.10 Trying to KILL watchdog (1513). ./tools/travis_mvn_watchdog.sh: line 210: 1513 Terminated watchdog MVN exited with EXIT CODE: 1. java.io.FileNotFoundException: build-target/lib/flink-dist-*.jar (No such file or directory) at java.util.zip.ZipFile.open(Native Method) at java.util.zip.ZipFile.<init>(ZipFile.java:215) at java.util.zip.ZipFile.<init>(ZipFile.java:145) at java.util.zip.ZipFile.<init>(ZipFile.java:116) at sun.tools.jar.Main.list(Main.java:1004) at sun.tools.jar.Main.run(Main.java:245) at sun.tools.jar.Main.main(Main.java:1177) find: `./flink-yarn-tests/target/flink-yarn-tests*': No such file or directory PRODUCED build artifacts. 1.log 2.log build_info mvn.out COMPRESSING build artifacts. 1.log 2.log build_info mvn.out Uploading to transfer.sh The command "./tools/travis_mvn_watchdog.sh 300" exited with 1. Done. Your build exited with 1. ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user manuzhang opened a pull request:

          https://github.com/apache/flink/pull/3661

          FLINK-4953 Allow access to "time" in ProcessWindowFunction.Context

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/manuzhang/flink process_window_function

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3661.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3661


          commit 2f37836f462c217d627088f0266e8a3222b9d919
          Author: manuzhang <owenzhang1990@gmail.com>
          Date: 2017-04-02T11:27:14Z

          FLINK-4953 Allow access to "time" in ProcessWindowFunction.Context


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user manuzhang opened a pull request: https://github.com/apache/flink/pull/3661 FLINK-4953 Allow access to "time" in ProcessWindowFunction.Context Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/manuzhang/flink process_window_function Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3661.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3661 commit 2f37836f462c217d627088f0266e8a3222b9d919 Author: manuzhang <owenzhang1990@gmail.com> Date: 2017-04-02T11:27:14Z FLINK-4953 Allow access to "time" in ProcessWindowFunction.Context
          Hide
          mauzhang Manu Zhang added a comment -

          Aljoscha Krettek, yes, I'd love to. I'll assign it to myself.

          Show
          mauzhang Manu Zhang added a comment - Aljoscha Krettek , yes, I'd love to. I'll assign it to myself.
          Hide
          aljoscha Aljoscha Krettek added a comment -

          Manu Zhang would you be interested in working on this now that ProcessWindowFunction is in? It should be quite straightforward and the hardest thing to do are probably tests.

          Show
          aljoscha Aljoscha Krettek added a comment - Manu Zhang would you be interested in working on this now that ProcessWindowFunction is in? It should be quite straightforward and the hardest thing to do are probably tests.

            People

            • Assignee:
              mauzhang Manu Zhang
              Reporter:
              mauzhang Manu Zhang
            • Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development