Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.3.0
-
None
-
None
-
OS: Linux
HDP: 2.6.5.0-292
Spark 2.3.0.2.6.5.0-292
Kafka 1.0.0.2.6.5.0-292.
Description
Checking the heap allocation with VirtualVM indicates that JMX Mbean Server memory usage grows linearly with time.
After a further investigation it seems that JMX Mbean Server is filled with thousands of instances of KafkaMbean objects with metrics for consumer-\d+ that goes into thousands (equal to the number of tasks created on the executor).
$KafkaMbean.objectName._canonicalName = kafka.consumer:client-id=consumer-\d+,type=consumer-metrics
Running Kafka consumer with DEBUG logs on the executor shows that the executor adds thousands of metrics sensors and often does not remove them at all or only removes some.
I would expect KafkaMbeans to be cleaned once the task has been completed.
But it seems that they are not cleaned when spark produces the following message:
[Executor task launch worker for task 42 | org.apache.spark.sql.kafka010.KafkaSourceRDD] INFO : Beginning offset 37 is the same as ending offset skipping extractBytesOutput 1
According to KafkaSourceRDD code consumer.release() is not called in this case eventually resulting in KafkaMetrics being retained in JMX Mbean Server for the corresponding task/consumer id.
Here is how I initialise structured streaming:
sparkSession .readStream .format("kafka") .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS, "subscribePattern" -> INPUT_TOPIC, "startingOffsets" -> "earliest", "failOnDataLoss" -> "false")) .mapPartitions(processData) .writeStream .format("kafka") .options(Map("kafka.bootstrap.servers" -> KAFKA_BROKERS, "checkpointLocation" -> CHECKPOINT_LOCATION)) .queryName("Process Data") .outputMode("update") .trigger(Trigger.ProcessingTime(1000)) .load() .start() .awaitTermination()
The Kafka partitions in question often have no data due to the sporadic nature of the producer.
Attachments
Issue Links
- duplicates
-
SPARK-24987 Kafka Cached Consumer Leaking File Descriptors
- Resolved