Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-4862

NPE on EventTimeSessionWindows with ContinuousEventTimeTrigger

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 1.2.0, 1.1.3
    • Fix Version/s: 1.2.0, 1.1.4
    • Component/s: DataStream API, Streaming
    • Labels:
      None

      Description

      what's the error ?

      The following NPE error is thrown when EventTimeSessionWindows with ContinuousEventTimeTrigger is used.

      Caused by: java.lang.NullPointerException
      	at org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.clear(ContinuousEventTimeTrigger.java:91)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:768)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:310)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:297)
      	at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:196)
      	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:297)
      	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
      	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
      	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:271)
      	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:609)
      	at java.lang.Thread.run(Thread.java:745)
      

      how to reproduce ?

      use ContinuousEventTimeTrigger instead of the default EventTimeTrigger in SessionWindowing example.

      what's the cause ?

      When two session windows are being merged, the states of the two ContinuousEventTimeTrigger are merged as well and the new namespace is the merged window. Later when the context tries to delete Timer from the old trigger and looks up the timestamp by the old namespace, null value is returned.

        Issue Links

          Activity

          Hide
          mauzhang Manu Zhang added a comment -

          A simple fix is to check for null return when looking up the fireTimestamp, e.g.

          ContinuousEventTimeTrigger.java
          	@Override
          	public void clear(W window, TriggerContext ctx) throws Exception {
          		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
          		Long timestamp = fireTimestamp.get();
          		if (timestamp != null) {
          			ctx.deleteEventTimeTimer(timestamp);
          		}
          		fireTimestamp.clear();
          	}
          
          Show
          mauzhang Manu Zhang added a comment - A simple fix is to check for null return when looking up the fireTimestamp , e.g. ContinuousEventTimeTrigger.java @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState< Long > fireTimestamp = ctx.getPartitionedState(stateDesc); Long timestamp = fireTimestamp.get(); if (timestamp != null ) { ctx.deleteEventTimeTimer(timestamp); } fireTimestamp.clear(); }
          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user manuzhang opened a pull request:

          https://github.com/apache/flink/pull/2671

          FLINK-4862 fix Timer register in ContinuousEventTimeTrigger

          Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
          If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
          In addition to going through the list, please provide a meaningful description of your changes.

          • [ ] General
          • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
          • The pull request addresses only one issue
          • Each commit in the PR has a meaningful commit message (including the JIRA id)
          • [ ] Documentation
          • Documentation has been added for new functionality
          • Old documentation affected by the pull request has been updated
          • JavaDoc for public methods has been added
          • [ ] Tests & Build
          • Functionality added by the pull request is covered by tests
          • `mvn clean verify` has been executed successfully locally or a Travis build has passed

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/manuzhang/flink fix_merge_window

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/2671.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #2671


          commit b2370946357044250511e25fce5078812ad22c82
          Author: manuzhang <owenzhang1990@gmail.com>
          Date: 2016-10-20T07:06:01Z

          FLINK-4862 fix Timer register in ContinuousEventTimeTrigger


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user manuzhang opened a pull request: https://github.com/apache/flink/pull/2671 FLINK-4862 fix Timer register in ContinuousEventTimeTrigger Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide] ( http://flink.apache.org/how-to-contribute.html ). In addition to going through the list, please provide a meaningful description of your changes. [ ] General The pull request references the related JIRA issue (" [FLINK-XXX] Jira title text") The pull request addresses only one issue Each commit in the PR has a meaningful commit message (including the JIRA id) [ ] Documentation Documentation has been added for new functionality Old documentation affected by the pull request has been updated JavaDoc for public methods has been added [ ] Tests & Build Functionality added by the pull request is covered by tests `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/manuzhang/flink fix_merge_window Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2671.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2671 commit b2370946357044250511e25fce5078812ad22c82 Author: manuzhang <owenzhang1990@gmail.com> Date: 2016-10-20T07:06:01Z FLINK-4862 fix Timer register in ContinuousEventTimeTrigger
          Hide
          mxm Maximilian Michels added a comment -

          Thanks for reporting Manu Zhang. I've made you contributor in JIRA and assigned you to the issue. I'll also check out your PR.

          Show
          mxm Maximilian Michels added a comment - Thanks for reporting Manu Zhang . I've made you contributor in JIRA and assigned you to the issue. I'll also check out your PR.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2671#discussion_r84463868

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java —
          @@ -99,8 +111,12 @@ public boolean canMerge() {
          }

          @Override

          • public TriggerResult onMerge(W window, OnMergeContext ctx) {
            + public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
            ctx.mergePartitionedState(stateDesc);
            + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
            + if (nextFireTimestamp != null) { + ctx.registerEventTimeTimer(nextFireTimestamp); + }
              • End diff –

          +1

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84463868 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java — @@ -99,8 +111,12 @@ public boolean canMerge() { } @Override public TriggerResult onMerge(W window, OnMergeContext ctx) { + public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); + if (nextFireTimestamp != null) { + ctx.registerEventTimeTimer(nextFireTimestamp); + } End diff – +1
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2671#discussion_r84464167

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java —
          @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t
          @Override
          public void clear(W window, TriggerContext ctx) throws Exception {
          ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

          • long timestamp = fireTimestamp.get();
          • ctx.deleteEventTimeTimer(timestamp);
          • fireTimestamp.clear();
            + Long timestamp = fireTimestamp.get();
            + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + }

            else if (cachedFireTimestamp != null)

            { + ctx.deleteEventTimeTimer(cachedFireTimestamp); + }
              • End diff –

          The above `else if` block is not correct because there is only one instance of the trigger which is reused for each Window. Hence the abstraction using the state descriptor to retrieve the appropriate state.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84464167 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java — @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); long timestamp = fireTimestamp.get(); ctx.deleteEventTimeTimer(timestamp); fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } else if (cachedFireTimestamp != null) { + ctx.deleteEventTimeTimer(cachedFireTimestamp); + } End diff – The above `else if` block is not correct because there is only one instance of the trigger which is reused for each Window. Hence the abstraction using the state descriptor to retrieve the appropriate state.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2671#discussion_r84465211

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java —
          @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t
          @Override
          public void clear(W window, TriggerContext ctx) throws Exception {
          ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

          • long timestamp = fireTimestamp.get();
          • ctx.deleteEventTimeTimer(timestamp);
          • fireTimestamp.clear();
            + Long timestamp = fireTimestamp.get();
            + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + }

            else if (cachedFireTimestamp != null)

            { + ctx.deleteEventTimeTimer(cachedFireTimestamp); + }
              • End diff –

          I think we're fine with not doing anything when `timestamp == null`. The old timer won't influence the newly merged window.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84465211 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java — @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); long timestamp = fireTimestamp.get(); ctx.deleteEventTimeTimer(timestamp); fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); + } else if (cachedFireTimestamp != null) { + ctx.deleteEventTimeTimer(cachedFireTimestamp); + } End diff – I think we're fine with not doing anything when `timestamp == null`. The old timer won't influence the newly merged window.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2671#discussion_r84464275

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java —
          @@ -45,6 +45,12 @@
          private final ReducingStateDescriptor<Long> stateDesc =
          new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

          + /**
          + * Used to preserve the fire timestamp before merge such that
          + * the corresponding timer could be cleared after merge
          + */
          + private Long cachedFireTimestamp = null;
          — End diff –

          This doesn't work because there is only one `Trigger` instance and this will potentially be overwritten by many Windows being merged.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84464275 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java — @@ -45,6 +45,12 @@ private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE); + /** + * Used to preserve the fire timestamp before merge such that + * the corresponding timer could be cleared after merge + */ + private Long cachedFireTimestamp = null; — End diff – This doesn't work because there is only one `Trigger` instance and this will potentially be overwritten by many Windows being merged.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2671#discussion_r84465826

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java —
          @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t
          @Override
          public void clear(W window, TriggerContext ctx) throws Exception {
          ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);

          • long timestamp = fireTimestamp.get();
          • ctx.deleteEventTimeTimer(timestamp);
          • fireTimestamp.clear();
            + Long timestamp = fireTimestamp.get();
            + if (timestamp != null) {
            + ctx.deleteEventTimeTimer(timestamp);
            + fireTimestamp.clear();
              • End diff –

          The above looks good.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84465826 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java — @@ -88,9 +96,13 @@ public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) t @Override public void clear(W window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); long timestamp = fireTimestamp.get(); ctx.deleteEventTimeTimer(timestamp); fireTimestamp.clear(); + Long timestamp = fireTimestamp.get(); + if (timestamp != null) { + ctx.deleteEventTimeTimer(timestamp); + fireTimestamp.clear(); End diff – The above looks good.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user manuzhang commented on the issue:

          https://github.com/apache/flink/pull/2671

          @mxm updated. Thanks for teaching me more about the internals. Ignoring the old Timer make things much simpler actually.

          Show
          githubbot ASF GitHub Bot added a comment - Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/2671 @mxm updated. Thanks for teaching me more about the internals. Ignoring the old Timer make things much simpler actually.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/2671

          @mxm @manuzhang Yes, the timers will be ignored because there is a check in `onEventTime()`.

          The fix looks good! Will you go ahead and merge, @mxm?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2671 @mxm @manuzhang Yes, the timers will be ignored because there is a check in `onEventTime()`. The fix looks good! Will you go ahead and merge, @mxm?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on the issue:

          https://github.com/apache/flink/pull/2671

          Thanks for having a look @aljoscha. Thanks again for the PR @manuzhang. Merging.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on the issue: https://github.com/apache/flink/pull/2671 Thanks for having a look @aljoscha. Thanks again for the PR @manuzhang. Merging.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user mxm commented on a diff in the pull request:

          https://github.com/apache/flink/pull/2671#discussion_r84689568

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java —
          @@ -99,8 +111,12 @@ public boolean canMerge() {
          }

          @Override

          • public TriggerResult onMerge(W window, OnMergeContext ctx) {
            + public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
            ctx.mergePartitionedState(stateDesc);
            + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
            + if (nextFireTimestamp != null) { + ctx.registerEventTimeTimer(nextFireTimestamp); + }
              • End diff –

          Yes, you're right. It is actually handled correctly in `EventTimeTrigger` but not for the continuous trigger.

          Show
          githubbot ASF GitHub Bot added a comment - Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2671#discussion_r84689568 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java — @@ -99,8 +111,12 @@ public boolean canMerge() { } @Override public TriggerResult onMerge(W window, OnMergeContext ctx) { + public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); + Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get(); + if (nextFireTimestamp != null) { + ctx.registerEventTimeTimer(nextFireTimestamp); + } End diff – Yes, you're right. It is actually handled correctly in `EventTimeTrigger` but not for the continuous trigger.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user asfgit closed the pull request at:

          https://github.com/apache/flink/pull/2671

          Show
          githubbot ASF GitHub Bot added a comment - Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2671
          Hide
          mxm Maximilian Michels added a comment -

          master: 45762162fe7c23fd921db1e0f826b2906bfe1dcd
          release-1.1: 05a5f460b33828cc8a1e6a45d37b555facc7133f

          Show
          mxm Maximilian Michels added a comment - master: 45762162fe7c23fd921db1e0f826b2906bfe1dcd release-1.1: 05a5f460b33828cc8a1e6a45d37b555facc7133f

            People

            • Assignee:
              mauzhang Manu Zhang
              Reporter:
              mauzhang Manu Zhang
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development