Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-17327

Kafka unavailability could cause Flink TM shutdown

    XMLWordPrintableJSON

Details

    Description

      Steps to reproduce:

      1. Start a Flink 1.10 standalone cluster
      2. Run a Flink job which reads from one Kafka topic and writes to another topic, with exactly-once checkpointing enabled
      3. Stop all Kafka Brokers after a few successful checkpoints

      When Kafka brokers are down:

      1. org.apache.kafka.clients.NetworkClient reported connection to broker could not be established
      2. Then, Flink could not complete snapshot due to Timeout expired while initializing transactional state in 60000ms
      3. After several snapshot failures, Flink reported Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.
      4. Eventually, Flink tried to cancel the task which did not succeed within 3 min. According to logs, consumer was cancelled, but producer is still running
      5. Then Fatal error occurred while executing the TaskManager. Shutting it down...

      I will attach the logs to show the details.  Worth to note that if there would be no consumer but producer only in the task, the behavior is different:

      1. org.apache.kafka.clients.NetworkClient reported connection to broker could not be established
      2. after delivery.timeout.ms (2min by default), producer reports: FlinkKafkaException: Failed to send data to Kafka: Expiring 4 record(s) for output-topic-0:120001 ms has passed since batch creation
      3. Flink tried to cancel the upstream tasks and created a new producer
      4. The new producer obviously reported connectivity issue to brokers
      5. This continues till Kafka brokers are back. 
      6. Flink reported Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.
      7. Flink cancelled the tasks and restarted them
      8. The job continues, and new checkpoint succeeded. 
      9. TM runs all the time in this scenario

      I set Kafka transaction time out to 1 hour just to avoid transaction timeout during the test.

      To get a producer only task, I called env.disableOperatorChaining(); in the second scenario. 

       

       

       

      Attachments

        1. 0001-Change-version-to-2.4.2-ALJOSCHA.patch
          0.7 kB
          Aljoscha Krettek
        2. 0002-Don-t-abort-in-flight-transactions.patch
          2 kB
          Aljoscha Krettek
        3. Standalonesession.log
          951 kB
          Jun Qin
        4. TM_produer_only_task.log
          742 kB
          Jun Qin
        5. TM.log
          802 kB
          Jun Qin

        Issue Links

          Activity

            People

              aljoscha Aljoscha Krettek
              qinjunjerry Jun Qin
              Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: