Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33192

State memory leak in the Window Operator due to unregistered cleanup timer

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

Details

    Description

      I have encountered a state memory leak issue in the default window operator. The cleanup timer for a window is not registered when it does not emit a result if it’s fired immediately after creation. The window is added to the window state and as the cleanup timer isn't registered, it's never cleaned up, allowing it to live forever.

      Steps to Reproduce:

      1. Write a custom trigger that triggers for every element.
      2. Write a custom aggregate function that never produces a result.
      3. Use a default tumbling event time window with this custom trigger and aggregate function.
      4. Publish events spanning multiple time windows.
      5. The window state will contain all the windows even after their expiry/cleanup time.

      Code with the bug:

      https://github.com/apache/flink/blob/cd95b560d0c11a64b42bf6b98107314d32a4de86/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L398-L417

      windowState.setCurrentNamespace(window);
      windowState.add(element.getValue());
      
      if (triggerResult.isFire()) {
          ACC contents = windowState.get();
          if (contents == null) {
              continue;
          }
          emitWindowContents(window, contents);
      }
      
      if (triggerResult.isPurge()) {
          windowState.clear();
      }
      registerCleanupTimer(window);

       

      Expected Result:

      The cleanup timer should be registered for every window that's added to the window state regardless of it emitting a result after it’s fired.

      Actual Result:

      The cleanup timer is not registered for a window when it does not emit a result after it’s fired, causing the window state that is already created to live on indefinitely.

      Impact:

      This issue led to a huge state memory leak in our applications and was very challenging to identify.

       

      Fix:

      There are two ways to fix this issue. I'm willing to create a PR with the fix if approved.

      1. Register the cleanup timer immediately after a window is added to the state.

      windowState.setCurrentNamespace(window);
      windowState.add(element.getValue());
      registerCleanupTimer(window);

      2. Emit the results when the contents are not null and remove the continue statement.

      if (triggerResult.isFire()) {
          ACC contents = windowState.get();
          if (contents != null) {         
              emitWindowContents(window, contents);
          }
      } 

       

       

       

       

      Attachments

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            kartikeypant Kartikey Pant
            vkalvakunta Vidya Sagar Kalvakunta
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment