Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.2.0
-
EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3
spark.blacklist.decommissioning.enabled true spark.blacklist.decommissioning.timeout 1h spark.cleaner.periodicGC.interval 10min spark.default.parallelism 18 spark.dynamicAllocation.enabled false spark.eventLog.enabled true spark.executor.cores 3 spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' spark.executor.id driver spark.executor.instances 3 spark.executor.memory 22G spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 spark.hadoop.parquet.enable.summary-metadata false spark.hadoop.yarn.timeline-service.enabled false spark.jars spark.master yarn spark.memory.fraction 0.9 spark.memory.storageFraction 0.3 spark.memory.useLegacyMode false spark.rdd.compress true spark.resourceManager.cleanupExpiredHost true spark.scheduler.mode FIFO spark.serializer org.apache.spark.serializer.KryoSerializer spark.shuffle.service.enabled true spark.speculation false spark.sql.parquet.filterPushdown true spark.sql.parquet.mergeSchema false spark.sql.warehouse.dir hdfs:///user/spark/warehouse spark.stage.attempt.ignoreOnDecommissionFetchFailure true spark.submit.deployMode client spark.yarn.am.cores 1 spark.yarn.am.memory 2G spark.yarn.am.memoryOverhead 1G spark.yarn.executor.memoryOverhead 3G EMR 5.9.0 with Spark 2.2.0 and Hadoop 2.7.3 spark.blacklist.decommissioning.enabled true spark.blacklist.decommissioning.timeout 1h spark.cleaner.periodicGC.interval 10min spark.default.parallelism 18 spark.dynamicAllocation.enabled false spark.eventLog.enabled true spark.executor.cores 3 spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' spark.executor.id driver spark.executor.instances 3 spark.executor.memory 22G spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 spark.hadoop.parquet.enable.summary-metadata false spark.hadoop.yarn.timeline-service.enabled false spark.jars spark.master yarn spark.memory.fraction 0.9 spark.memory.storageFraction 0.3 spark.memory.useLegacyMode false spark.rdd.compress true spark.resourceManager.cleanupExpiredHost true spark.scheduler.mode FIFO spark.serializer org.apache.spark.serializer.KryoSerializer spark.shuffle.service.enabled true spark.speculation false spark.sql.parquet.filterPushdown true spark.sql.parquet.mergeSchema false spark.sql.warehouse.dir hdfs:///user/spark/warehouse spark.stage.attempt.ignoreOnDecommissionFetchFailure true spark.submit.deployMode client spark.yarn.am.cores 1 spark.yarn.am.memory 2G spark.yarn.am.memoryOverhead 1G spark.yarn.executor.memoryOverhead 3G
Description
It seems like there is an issue with memory in structured streaming. A stream with aggregation (dropDuplicates()) and data partitioning constantly increases memory usage and finally executors fails with exit code 137:
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
Stream creating looks something like this:
session
.readStream()
.schema(inputSchema)
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv("s3://test-bucket/input")
.as(Encoders.bean(TestRecord.class))
.flatMap(mf, Encoders.bean(TestRecord.class))
.dropDuplicates("testId", "testName")
.withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
.writeStream()
.option("path", "s3://test-bucket/output")
.option("checkpointLocation", "s3://test-bucket/checkpoint")
.trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
.partitionBy("year")
.format("parquet")
.outputMode(OutputMode.Append())
.queryName("test-stream")
.start();
Analyzing the heap dump I found that most of the memory used by org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider that is referenced from StateStore
On the first glance it looks normal since that is how Spark keeps aggregation keys in memory. However I did my testing by renaming files in source folder, so that they could be picked up by spark again. Since input records are the same all further rows should be rejected as duplicates and memory consumption shouldn't increase but it's not true. Moreover, GC time took more than 30% of total processing time.
Attachments
Attachments
Issue Links
- is duplicated by
-
SPARK-24717 Split out min retain version of state for memory in HDFSBackedStateStoreProvider
- Resolved