Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
1.18.0
Description
When using MiniCluster mode, file descriptors like /tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState are not released after a Job completes. Executing multiple Jobs in the same JVM might result in leftover file descriptors, potentially leading to problems.
After executing the reproducing code provided below (after entering the sleep), running lsof -p 18162 reveals:
... java 18162 sa_cluster 30r DIR 253,1 0 1311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java 18162 sa_cluster 31r DIR 253,1 0 1311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java 18162 sa_cluster 32r DIR 253,1 0 1310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java 18162 sa_cluster 33r DIR 253,1 0 1310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java 18162 sa_cluster 34r DIR 253,1 0 1311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java 18162 sa_cluster 35r DIR 253,1 0 1311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java 18162 sa_cluster 36r DIR 253,1 0 1311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java 18162 sa_cluster 37r DIR 253,1 0 1311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java 18162 sa_cluster 38r DIR 253,1 0 1311979 /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) ...
The code used for reproduction is as follows:
import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * javac -cp 'lib/*' TestReleaseFd.java * java -Xmx600m -cp '.:lib/*' TestReleaseFd */ public class TestReleaseFd { public static void main(String[] args) throws Exception { for (int i = 0; i < 10; ++i) { int round = i; Thread thread = new Thread(() -> { try { Configuration configuration = new Configuration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, 100000); longDataStreamSource.addSink(new DiscardingSink<>()); StreamGraph streamGraph = env.getStreamGraph(); streamGraph.setJobName("test-" + System.nanoTime()); JobClient jobClient = env.executeAsync(streamGraph); CompletableFuture<JobExecutionResult> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); JobExecutionResult jobExecutionResult = null; while (jobExecutionResult == null) { try { jobExecutionResult = jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); } catch (TimeoutException timeoutException) { // ignore } } System.out.println("finished round: " + round); env.close(); } catch (Exception e) { throw new RuntimeException(e); } }); thread.setDaemon(true); thread.start(); thread.join(); System.out.println("done ... " + i); } // ======================= lsof -p 18162 Thread.sleep(500_000_000); } }
The above code can be consistently reproduced in Flink 1.18.0, but there is no issue in Flink 1.14.6.