Uploaded image for project: 'Flink'
  1. Flink
  2. FLINK-9630

Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Major
    • Resolution: Won't Do
    • 1.4.2, 1.5.0
    • None
    • Connectors / Kafka
    • Linux 2.6, java 8, Kafka broker 0.10.x

    Description

      when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer will get a TopicAuthorizationException in getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).

       

      this issue can bring down the whole Flink cluster, because, in a default setup (fixedDelay with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager that has free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually almost every TaskManager will run out of file handle, hence no taskmanger could make snapshot, or accept new job. Effectly stops the whole cluster.

       

      The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic) in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a TopicAuthorizationException, no one catches this.

      Though StreamTask.open catches Exception and invoks the dispose() method of each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer in partitionDiscoverer, not even invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

       

      below is the code of FlinkKakfaConsumerBase.cancel() for your convenience

      public void cancel() {
          // set ourselves as not running;
          // this would let the main discovery loop escape as soon as possible
          running = false;

          if (discoveryLoopThread != null) {

              if (partitionDiscoverer != null)

      {             // we cannot close the discoverer here, as it is error-prone to concurrent access;             // only wakeup the discoverer, the discovery loop will clean itself up after it escapes             partitionDiscoverer.wakeup();         }

          // the discovery loop may currently be sleeping in-between
          // consecutive discoveries; interrupt to shutdown faster
          discoveryLoopThread.interrupt();
          }

          // abort the fetcher, if there is one
          if (kafkaFetcher != null)

      {          kafkaFetcher.cancel();     }

      }

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            ubyyj Youjun Yuan
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Time Tracking

                Estimated:
                Original Estimate - Not Specified
                Not Specified
                Remaining:
                Remaining Estimate - 0h
                0h
                Logged:
                Time Spent - 10m
                10m