Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-27818

Spark Structured Streaming executors fails with OutOfMemoryError due to KafkaMbeans

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Duplicate
    • Affects Version/s: 2.3.0
    • Fix Version/s: None
    • Labels:
      None
    • Environment:

      OS: Linux

      HDP: 2.6.5.0-292

      Spark 2.3.0.2.6.5.0-292

      Kafka 1.0.0.2.6.5.0-292.

      Description

      Checking the heap allocation with VirtualVM indicates that JMX Mbean Server memory usage grows linearly with time.

      After a further investigation it seems that JMX Mbean Server is filled with thousands of instances of KafkaMbean objects with metrics for consumer-\d+ that goes into thousands (equal to the number of tasks created on the executor).

      $KafkaMbean.objectName._canonicalName = kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
      

       

      Running Kafka consumer with DEBUG logs on the executor shows that the executor adds thousands of metrics sensors and often does not remove them at all or only removes some.

      I would expect KafkaMbeans to be cleaned once the task has been completed.

      But it seems that they are not cleaned when spark produces the following message:

       

      [Executor task launch worker for task 42 | org.apache.spark.sql.kafka010.KafkaSourceRDD] INFO : Beginning offset 37 is the same as ending offset skipping extractBytesOutput 1
      

       

      According to KafkaSourceRDD code consumer.release() is not called in this case eventually resulting in KafkaMetrics being retained in JMX Mbean Server for the corresponding task/consumer id.

       

      Here is how I initialise structured streaming: 

      sparkSession
       .readStream
       .format("kafka")
       .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS,
                    "subscribePattern" -> INPUT_TOPIC,
                    "startingOffsets" -> "earliest",
                    "failOnDataLoss" -> "false"))
       .mapPartitions(processData)
       .writeStream
       .format("kafka")
       .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS, 
                    "checkpointLocation" -> CHECKPOINT_LOCATION))
       .queryName("Process Data")
       .outputMode("update")
       .trigger(Trigger.ProcessingTime(1000))
       .load()
       .start()
       .awaitTermination()
      

       

      The Kafka partitions in question often have no data due to the sporadic nature of the producer.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                random.walker Ruslan Taran
              • Votes:
                2 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: