Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-21057

Streaming checkpointing with small interval leads app to hang

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Not a Priority
    • Resolution: Unresolved
    • 1.11.3
    • None
    • Connectors / Kafka
      • streaming app
      • flink cluster in standalone-job / application mode
      • 1.11.3 Flink version
      • jobmanager --> 1 instance
      • taskmanager --> 1 instance
      • parallelism --> 2

    Description

      There is a simple streaming app with enabled checkpointing:

      • statebackend --> RockDB
      • mode --> EXACTLY_ONCE

      STRs:
      1. Run Flink cluster in standalone-job / application mode (with embedded streaming app)
      2. Get error
      3. Wait 1 min
      4. Stop Flink cluster
      4. Repeat steps from 1 to 3 util error :

      taskmanager
      org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
      flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1352) ~[?:?]
      flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1260) ~[?:?]
      flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) ~[?:?]
      flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:572) ~[?:?]
      flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:564) ~[?:?]
      flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:414) ~[?:?]
      flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:312) ~[?:?]
      flink-kafka-mirror-maker-jobmanager     | 	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[?:?]
      flink-kafka-mirror-maker-jobmanager     | 	at java.lang.Thread.run(Unknown Source) ~[?:?]
      

      It is obvious

      Please find below:

      • streaming app code base (example)
      • attached logs
        • jobmanager
        • taskmanager

      Example

      App

      build.gradle (dependencies)
      ...
      ext {
        ...
        javaVersion = '11'
        flinkVersion = '1.12.0'
        scalaBinaryVersion = '2.11'
        ...
      }
      
      dependencies {
        ...
        implementation "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
        implementation "org.apache.flink:flink-clients_${scalaBinaryVersion}:${flinkVersion}"
        implementation "org.apache.flink:flink-statebackend-rocksdb_${scalaBinaryVersion}:${flinkVersion}"
        ...
      }
      
      App
      public static void main(String[] args) {
        ...
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
        env.setParallelism(2);
      
        env.enableCheckpointing(500);
      
        env.setStateBackend(new RocksDBStateBackend("file:///xxx/config/checkpoints/rocksdb", true));
      
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        env.getCheckpointConfig().setCheckpointTimeout(600000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
      
        FlinkKafkaConsumer<Record> consumer = createConsumer();
        FlinkKafkaProducer<Record> producer = createProducer();
      
        env
          .addSource(consumer)
          .uid("kafka-consumer")
          .addSink(producer)
          .uid("kafka-producer")
        ;
      
        env.execute();
      }
      
      public static FlinkKafkaConsumer<Record> createConsumer() {
        ...
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-source-1:9091");
        ... // nothing special
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      
        FlinkKafkaConsumer<Record> consumer = new FlinkKafkaConsumer<>("topic-1", new RecordKafkaDerSchema(), props);
      
        ... // RecordKafkaDerSchema --> custom schema is used to copy not only message body but message key too
        ... // SimpleStringSchema --> can be used instead to reproduce issue
      
        consumer.setStartFromGroupOffsets();
        consumer.setCommitOffsetsOnCheckpoints(true);
      
        return consumer;
      }
      
      public static FlinkKafkaProducer<Record> createProducer() {
        ...
        Properties props = new Properties();
        ...
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-target-1:9094");
        props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
        props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1000");
        props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000");
        props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "9000");
        props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "xxx"); // ignored due to expected behaviour - https://issues.apache.org/jira/browse/FLINK-17691
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "" + (15 * 60 * 1000)); // decreased from 1 hour to 15 mins; app is going to be restarted quickly
        ...
        FlinkKafkaProducer<Record> producer = new FlinkKafkaProducer<>("topic-1", new RecordKafkaSerSchema(true), props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
      
        ... // RecordKafkaSerSchema --> custom schema is used to copy not only message body but message key too
        ... // SimpleStringSchema --> can be used instead to reproduce issue
      
        return producer;
      }
      

      Attachments

        1. jobmanager.log
          280 kB
          Nazar Volynets
        2. taskmanager.log
          2.09 MB
          Nazar Volynets

        Activity

          People

            Unassigned Unassigned
            nvolynets Nazar Volynets
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: