Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-5713

Protect against NPE in WindowOperator window cleanup

    Details

    • Type: Improvement
    • Status: Closed
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 1.2.0
    • Fix Version/s: 1.2.1
    • Component/s: DataStream API
    • Labels:
      None

      Description

      Some (misbehaved) WindowAssigners can cause windows to be dropped from the merging window set while a cleanup timer is still active. This will trigger a NullPointerException when that timer fires.

        Issue Links

          Activity

          Hide
          githubbot ASF GitHub Bot added a comment -

          GitHub user aljoscha opened a pull request:

          https://github.com/apache/flink/pull/3535

          FLINK-5713 Protect against NPE in WindowOperator window cleanup

          You can merge this pull request into a Git repository by running:

          $ git pull https://github.com/aljoscha/flink jira-5713-window-gc-protection

          Alternatively you can review and apply these changes as the patch at:

          https://github.com/apache/flink/pull/3535.patch

          To close this pull request, make a commit to your master/trunk branch
          with (at least) the following in the commit message:

          This closes #3535


          commit e126758cecbf7ba9a9e2d6c66f004b25af3f1097
          Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
          Date: 2017-02-06T10:10:14Z

          FLINK-5713 Protect against NPE in WindowOperator window cleanup


          Show
          githubbot ASF GitHub Bot added a comment - GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3535 FLINK-5713 Protect against NPE in WindowOperator window cleanup You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-5713-window-gc-protection Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3535.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3535 commit e126758cecbf7ba9a9e2d6c66f004b25af3f1097 Author: Aljoscha Krettek <aljoscha.krettek@gmail.com> Date: 2017-02-06T10:10:14Z FLINK-5713 Protect against NPE in WindowOperator window cleanup
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3535

          R: @kl0u for review

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3535 R: @kl0u for review
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3535#discussion_r105931848

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
          @@ -354,22 +354,27 @@ public void merge(W mergeResult,
          }
          });

          • // drop if the window is already late
          • if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - }

            + context.key = key;
            + context.window = actualWindow;

          W stateWindow = mergingWindows.getStateWindow(actualWindow);
          if (stateWindow == null)

          { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); }

          windowState.setCurrentNamespace(stateWindow);

          • windowState.add(element.getValue());
          • context.key = key;
          • context.window = actualWindow;
            + // Drop if the window is already late. In rare cases (with a misbehaving
            + // WindowAssigner) it can happen that a window becomes late that already has
            + // state (contents, state and timers). That's why we first get the window state
            + // above and then drop everything.
            + if (isLate(actualWindow)) {
            + clearAllState(actualWindow, windowState, mergingWindows);
            + mergingWindows.persist();
              • End diff –

          Why move `mergingWindows.persist()` from `clearAllState` to here, And we need not do the null check? How about
          ```
          if (mergingWindows != null)

          { mergingWindows.persist(); }

          ```

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r105931848 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); // drop if the window is already late if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); context.key = key; context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); End diff – Why move `mergingWindows.persist()` from `clearAllState` to here, And we need not do the null check? How about ``` if (mergingWindows != null) { mergingWindows.persist(); } ```
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3535#discussion_r105956185

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
          @@ -354,22 +354,27 @@ public void merge(W mergeResult,
          }
          });

          • // drop if the window is already late
          • if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - }

            + context.key = key;
            + context.window = actualWindow;

          W stateWindow = mergingWindows.getStateWindow(actualWindow);
          if (stateWindow == null)

          { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); }

          windowState.setCurrentNamespace(stateWindow);

          • windowState.add(element.getValue());
          • context.key = key;
          • context.window = actualWindow;
            + // Drop if the window is already late. In rare cases (with a misbehaving
            + // WindowAssigner) it can happen that a window becomes late that already has
            + // state (contents, state and timers). That's why we first get the window state
            + // above and then drop everything.
            + if (isLate(actualWindow)) {
            + clearAllState(actualWindow, windowState, mergingWindows);
            + mergingWindows.persist();
              • End diff –

          The null check is not needed here since since we know from the if block we're in (`if (windowAssigner instanceof MergingWindowAssigner)`) that we do in fact have merging windows.

          Now that I look at it, though, I realise that the `mergingWindows.persist()` call is not necessary because we already call it at the end of the if block. So thanks for making me notice! 😃

          I moved the call out of `clearAllState()` in the first place because all places where `clearAllState()` are called already persist afterwards. See, for example, `onEventTime()` and `onProcessingTime()`.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r105956185 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); // drop if the window is already late if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); context.key = key; context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); End diff – The null check is not needed here since since we know from the if block we're in (`if (windowAssigner instanceof MergingWindowAssigner)`) that we do in fact have merging windows. Now that I look at it, though, I realise that the `mergingWindows.persist()` call is not necessary because we already call it at the end of the if block. So thanks for making me notice! 😃 I moved the call out of `clearAllState()` in the first place because all places where `clearAllState()` are called already persist afterwards. See, for example, `onEventTime()` and `onProcessingTime()`.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user sunjincheng121 commented on a diff in the pull request:

          https://github.com/apache/flink/pull/3535#discussion_r106065855

          — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java —
          @@ -354,22 +354,27 @@ public void merge(W mergeResult,
          }
          });

          • // drop if the window is already late
          • if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - }

            + context.key = key;
            + context.window = actualWindow;

          W stateWindow = mergingWindows.getStateWindow(actualWindow);
          if (stateWindow == null)

          { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); }

          windowState.setCurrentNamespace(stateWindow);

          • windowState.add(element.getValue());
          • context.key = key;
          • context.window = actualWindow;
            + // Drop if the window is already late. In rare cases (with a misbehaving
            + // WindowAssigner) it can happen that a window becomes late that already has
            + // state (contents, state and timers). That's why we first get the window state
            + // above and then drop everything.
            + if (isLate(actualWindow)) {
            + clearAllState(actualWindow, windowState, mergingWindows);
            + mergingWindows.persist();
              • End diff –

          Yes,in fact,I had seen that, but I did not realize that it could be deleted.Haha, You are very quick-witted.
          Anyway, I must say thanks for your explaining.
          Best,
          SunJincheng

          Show
          githubbot ASF GitHub Bot added a comment - Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3535#discussion_r106065855 — Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java — @@ -354,22 +354,27 @@ public void merge(W mergeResult, } }); // drop if the window is already late if (isLate(actualWindow)) { - mergingWindows.retireWindow(actualWindow); - continue; - } + context.key = key; + context.window = actualWindow; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } windowState.setCurrentNamespace(stateWindow); windowState.add(element.getValue()); context.key = key; context.window = actualWindow; + // Drop if the window is already late. In rare cases (with a misbehaving + // WindowAssigner) it can happen that a window becomes late that already has + // state (contents, state and timers). That's why we first get the window state + // above and then drop everything. + if (isLate(actualWindow)) { + clearAllState(actualWindow, windowState, mergingWindows); + mergingWindows.persist(); End diff – Yes,in fact,I had seen that, but I did not realize that it could be deleted.Haha, You are very quick-witted. Anyway, I must say thanks for your explaining. Best, SunJincheng
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3535

          @StephanEwen do you think this is enough for Flink 1.2.1 or should we do the full https://issues.apache.org/jira/browse/FLINK-5972 that throws an error if users have shrinking merging windows?

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3535 @StephanEwen do you think this is enough for Flink 1.2.1 or should we do the full https://issues.apache.org/jira/browse/FLINK-5972 that throws an error if users have shrinking merging windows?
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user StephanEwen commented on the issue:

          https://github.com/apache/flink/pull/3535

          I can not really assess what this change is doing in detail, meaning how it affects typical work loads. Would need a bit more context.

          The change looks small an innocent and seems to have decent tests

          Show
          githubbot ASF GitHub Bot added a comment - Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3535 I can not really assess what this change is doing in detail, meaning how it affects typical work loads. Would need a bit more context. The change looks small an innocent and seems to have decent tests
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha commented on the issue:

          https://github.com/apache/flink/pull/3535

          Closing in favour of #3587 which ensures that the situation this PR fixes cannot occur in the first place.

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3535 Closing in favour of #3587 which ensures that the situation this PR fixes cannot occur in the first place.
          Hide
          githubbot ASF GitHub Bot added a comment -

          Github user aljoscha closed the pull request at:

          https://github.com/apache/flink/pull/3535

          Show
          githubbot ASF GitHub Bot added a comment - Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3535
          Hide
          aljoscha Aljoscha Krettek added a comment -

          FLINK-5972 is the more general fix for this.

          Show
          aljoscha Aljoscha Krettek added a comment - FLINK-5972 is the more general fix for this.

            People

            • Assignee:
              aljoscha Aljoscha Krettek
              Reporter:
              aljoscha Aljoscha Krettek
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved:

                Development