Details
-
Bug
-
Status: Open
-
Major
-
Resolution: Unresolved
-
1.15.0, 1.13.6, 1.14.3
-
None
-
None
Description
In StreamExecutionEnvironment#createFileInput , the env.getParallelism() was passed to ContinuousFileMonitoringFunction as the parallelism of downstream readers. This value is incorrect when the parallelism of the downstream readers is manually configured by the user.
For example, in the test below, 1 will be passed as readerParallelism, but the actual parallelism of downstream readers is 5. This will result in only 1 split being generated, even though there are 5 downstream readers.
@Test public void testContinuousFileMonitoringFunction() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); final String fileContent = "line1\n" + "line2\n" + "line3\n" + "line4\n" + "line5\n"; final File file = createTempFile(fileContent); env.readTextFile(file.getPath()).name("TextSource").setParallelism(5) .forward() .addSink(new PrintSinkFunction<>()).setParallelism(5); env.execute(); } private File createTempFile(String content) throws IOException { File tempFile = File.createTempFile("test_contents", "tmp"); tempFile.deleteOnExit(); OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile), StandardCharsets.UTF_8); wrt.write(content); wrt.close(); return tempFile; }