Status: Resolved
Resolution: Fixed
1.15.3, 1.16.1
When a job creates multiple sources that use the SourceCoordinator (FLIP-27), there is a failure race condition that result in a "leak" of ExecutionVertextVersion due to a "queue" of pending global failures.
This results in the Job Manager becoming unresponsive.
Reproduction Steps
This can be reproduced by a job that creates multiple sources that fail in the SplitEnumerator. We observed this with multiple KafkaSource's trying to load a non-existent cert from the file system and throwing FNFE. Thus, here is a simple job to reproduce (BE WARNED: running this locally will lock up your IDE):
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS))); KafkaSource<String> source = KafkaSource.<String>builder() .setProperty("security.protocol", "SASL_SSL") // SSL configurations // Configure the path of truststore (CA) provided by the server .setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks") .setProperty("ssl.truststore.password", "test1234") // Configure the path of keystore (private key) if client authentication is required .setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks") .setProperty("ssl.keystore.password", "test1234") // SASL configurations // Set SASL mechanism as SCRAM-SHA-256 .setProperty("sasl.mechanism", "SCRAM-SHA-256") // Set JAAS configurations .setProperty("sasl.jaas.config", " required username=\"username\" password=\"password\";") .setBootstrapServers("http://localhost:3456") .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32) .mapToObj(i -> env .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source " + i).uid("source-" + i) .keyBy(s -> s.charAt(0)) .map(s -> s)) .collect(Collectors.toList()); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").uid("source") .keyBy(s -> s.charAt(0)) .union(sources.toArray(new SingleOutputStreamOperator[] {})) .print(); env.execute("test job");
Root Cause
We can see that the OperatorCoordinatorHolder already has a debounce mechanism, however the DefaultScheduler does not. We need a debounce mechanism in the DefaultScheduler since it handles many OperatorCoordinatorHolder.
I have managed to fix this, I will open a PR, but would need feedback from people who understand this code better than me!