Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-35737

Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown

    XMLWordPrintableJSON

Details

    Description

      MemoryExecutionGraphInfoStore registers a ShutdownHook upon construction and deregisters it within its close() method.

      public MemoryExecutionGraphInfoStore(...) {
          ...
          this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG);
      }
      
      @Override
      public void close() throws IOException {
          ...
          // Remove shutdown hook to prevent resource leaks
          ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG);
      }

      Currently, MiniCluster instantiates a MemoryExecutionGraphInfoStore object but doesn't retain a reference to it, nor does it call close() during its own shutdown process.

              final DispatcherResourceManagerComponent dispatcherResourceManagerComponent =
                      dispatcherResourceManagerComponentFactory.create(
                              ...
                              new MemoryExecutionGraphInfoStore(),  // -> new
                              ...); 

      This behavior leads to an accumulation of ShutdownHooks when running multiple Flink jobs within the same local JVM. These accumulating hooks, along with their associated references, contribute to a memory leak.

      This patch addresses the issue by ensuring that MemoryExecutionGraphInfoStore's close() method is invoked during MiniCluster shutdown.

      https://github.com/apache/flink/pull/25009

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              fengjiajie Feng Jiajie
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

              Dates

                Created:
                Updated: