Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-24987

Kafka Cached Consumer Leaking File Descriptors

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: 2.2.2, 2.3.0, 2.3.1
    • Fix Version/s: 2.3.2, 2.4.0
    • Component/s: Structured Streaming
    • Labels:
      None
    • Environment:

      Spark 2.3.1

      Java(TM) SE Runtime Environment (build 1.8.0_112-b15)
      Java HotSpot(TM) 64-Bit Server VM (build 25.112-b15, mixed mode)

       

      Description

      Setup:

      • Spark 2.3.1
      • Java 1.8.0 (112)
      • Standalone Cluster Manager
      • 3 Nodes, 1 Executor per node.

      Spark 2.3.0 introduced a new mechanism for caching Kafka consumers (https://issues.apache.org/jira/browse/SPARK-23623?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel) via KafkaDataConsumer.acquire.

      It seems that there are situations (I've been trying to debug it, haven't been able to find the root cause as of yet) where cached consumers remain "in use" throughout the life time of the task and are never released. This can be identified by the following line of the stack trace:

      at org.apache.spark.sql.kafka010.KafkaDataConsumer$.acquire(KafkaDataConsumer.scala:460)

      Which points to:

      } else if (existingInternalConsumer.inUse) {
        // If consumer is already cached but is currently in use, then return a new consumer
        NonCachedKafkaDataConsumer(newInternalConsumer)
      

       Meaning the existing consumer created for that `TopicPartition` is still in use for some reason. The weird thing is that you can see this for very old tasks which have already finished successfully.

      I've traced down this leak using file leak detector, attaching it to the running Executor JVM process. I've emitted the list of open file descriptors which you can find here, and you can see that the majority of them are epoll FD used by Kafka Consumers, indicating that they aren't closing.

       Spark graph:

      kafkaStream
        .load()
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .as[(String, String)]
        .flatMap {...}
        .groupByKey(...)
        .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(...)
        .foreach(...)
        .outputMode(OutputMode.Update)
        .option("checkpointLocation",
      sparkConfiguration.properties.checkpointDirectory)
        .start()
        .awaitTermination()

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Yuval.Itzchakov Yuval Itzchakov
                Reporter:
                Yuval.Itzchakov Yuval Itzchakov
              • Votes:
                1 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: