Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-31041

Build up of pending global failures causes JM instability

    XMLWordPrintableJSON

Details

    Description

      Context

      When a job creates multiple sources that use the SourceCoordinator (FLIP-27), there is a failure race condition that result in a "leak" of ExecutionVertextVersion due to a "queue" of pending global failures. 

      This results in the Job Manager becoming unresponsive.

      Reproduction Steps

      This can be reproduced by a job that creates multiple sources that fail in the SplitEnumerator. We observed this with multiple KafkaSource's trying to load a non-existent cert from the file system and throwing FNFE. Thus, here is a simple job to reproduce (BE WARNED: running this locally will lock up your IDE):

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(1);
      env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(10000, Time.of(10, TimeUnit.SECONDS), Time.of(10, TimeUnit.SECONDS)));
      
      KafkaSource<String> source = KafkaSource.<String>builder()
              .setProperty("security.protocol", "SASL_SSL")
              // SSL configurations
              // Configure the path of truststore (CA) provided by the server
              .setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
              .setProperty("ssl.truststore.password", "test1234")
              // Configure the path of keystore (private key) if client authentication is required
              .setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
              .setProperty("ssl.keystore.password", "test1234")
              // SASL configurations
              // Set SASL mechanism as SCRAM-SHA-256
              .setProperty("sasl.mechanism", "SCRAM-SHA-256")
              // Set JAAS configurations
              .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";")
              .setBootstrapServers("http://localhost:3456")
              .setTopics("input-topic")
              .setGroupId("my-group")
              .setStartingOffsets(OffsetsInitializer.earliest())
              .setValueOnlyDeserializer(new SimpleStringSchema())
              .build();
      
      List<SingleOutputStreamOperator<String>> sources = IntStream.range(0, 32)
              .mapToObj(i -> env
                      .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source " + i).uid("source-" + i)
                      .keyBy(s -> s.charAt(0))
                      .map(s -> s))
              .collect(Collectors.toList());
      
      env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source").uid("source")
              .keyBy(s -> s.charAt(0))
              .union(sources.toArray(new SingleOutputStreamOperator[] {}))
              .print();
      
      env.execute("test job"); 

      Root Cause

      We can see that the OperatorCoordinatorHolder already has a debounce mechanism, however the DefaultScheduler does not. We need a debounce mechanism in the DefaultScheduler since it handles many OperatorCoordinatorHolder.

      Fix

      I have managed to fix this, I will open a PR, but would need feedback from people who understand this code better than me!

       

       

      Attachments

        1. flink-31041-heap-dump.png
          98 kB
          Danny Cranmer
        2. test-restart-strategy.log
          237 kB
          Zhu Zhu
        3. failovers.log
          155 kB
          Zhu Zhu

        Activity

          People

            huwh !huwh
            dannycranmer Danny Cranmer
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: