Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-33981

File Descriptor References Not Released After Job Execution in MiniCluster Mode

    XMLWordPrintableJSON

Details

    Description

      When using MiniCluster mode, file descriptors like /tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState are not released after a Job completes. Executing multiple Jobs in the same JVM might result in leftover file descriptors, potentially leading to problems.

      After executing the reproducing code provided below (after entering the sleep), running lsof -p 18162 reveals:

      ...
      java    18162 sa_cluster   30r   DIR              253,1         0    1311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
      java    18162 sa_cluster   31r   DIR              253,1         0    1311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
      java    18162 sa_cluster   32r   DIR              253,1         0    1310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
      java    18162 sa_cluster   33r   DIR              253,1         0    1310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
      java    18162 sa_cluster   34r   DIR              253,1         0    1311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
      java    18162 sa_cluster   35r   DIR              253,1         0    1311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
      java    18162 sa_cluster   36r   DIR              253,1         0    1311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
      java    18162 sa_cluster   37r   DIR              253,1         0    1311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
      java    18162 sa_cluster   38r   DIR              253,1         0    1311979 /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
      ...
      

      The code used for reproduction is as follows:

      import org.apache.flink.api.common.JobExecutionResult;
      import org.apache.flink.configuration.Configuration;
      import org.apache.flink.core.execution.JobClient;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
      import org.apache.flink.streaming.api.graph.StreamGraph;
      
      import java.util.concurrent.CompletableFuture;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.TimeoutException;
      
      /**
       * javac -cp 'lib/*' TestReleaseFd.java
       * java -Xmx600m -cp '.:lib/*' TestReleaseFd
       */
      public class TestReleaseFd {
      
        public static void main(String[] args) throws Exception {
          for (int i = 0; i < 10; ++i) {
            int round = i;
            Thread thread = new Thread(() -> {
              try {
                Configuration configuration = new Configuration();
                final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
                env.setParallelism(1);
      
                DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, 100000);
                longDataStreamSource.addSink(new DiscardingSink<>());
      
                StreamGraph streamGraph = env.getStreamGraph();
                streamGraph.setJobName("test-" + System.nanoTime());
                JobClient jobClient = env.executeAsync(streamGraph);
      
                CompletableFuture<JobExecutionResult> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
                JobExecutionResult jobExecutionResult = null;
                while (jobExecutionResult == null) {
                  try {
                    jobExecutionResult = jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
                  } catch (TimeoutException timeoutException) {
                    // ignore
                  }
                }
                System.out.println("finished round: " + round);
                env.close();
              } catch (Exception e) {
                throw new RuntimeException(e);
              }
            });
      
            thread.setDaemon(true);
            thread.start();
            thread.join();
      
            System.out.println("done ... " + i);
          }
          
          // ======================= lsof -p 18162
          Thread.sleep(500_000_000);
        }
      }
       

      The above code can be consistently reproduced in Flink 1.18.0, but there is no issue in Flink 1.14.6.

      Attachments

        Activity

          People

            fengjiajie Feng Jiajie
            fengjiajie Feng Jiajie
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: