Description
In aggregation case, a Lost task happened with below error.
java.io.IOException: Could not acquire 65536 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220) at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126) at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Key SQL Query
INSERT INTO TABLE test_table SELECT ss.ss_customer_sk AS cid, count(CASE WHEN i.i_class_id=1 THEN 1 ELSE NULL END) AS id1, count(CASE WHEN i.i_class_id=3 THEN 1 ELSE NULL END) AS id3, count(CASE WHEN i.i_class_id=5 THEN 1 ELSE NULL END) AS id5, count(CASE WHEN i.i_class_id=7 THEN 1 ELSE NULL END) AS id7, count(CASE WHEN i.i_class_id=9 THEN 1 ELSE NULL END) AS id9, count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11, count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13, count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15, count(CASE WHEN i.i_class_id=2 THEN 1 ELSE NULL END) AS id2, count(CASE WHEN i.i_class_id=4 THEN 1 ELSE NULL END) AS id4, count(CASE WHEN i.i_class_id=6 THEN 1 ELSE NULL END) AS id6, count(CASE WHEN i.i_class_id=8 THEN 1 ELSE NULL END) AS id8, count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10, count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14, count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16 FROM store_sales ss INNER JOIN item i ON ss.ss_item_sk = i.i_item_sk WHERE i.i_category IN ('Books') AND ss.ss_customer_sk IS NOT NULL GROUP BY ss.ss_customer_sk HAVING count(ss.ss_item_sk) > 5
Note:
the store_sales is a big fact table and item is a small dimension table.
Attachments
Issue Links
- relates to
-
SPARK-10733 TungstenAggregation cannot acquire page after switching to sort-based
- Resolved
-
SPARK-10783 Do track the pointer array in UnsafeInMemorySorter
- Resolved
-
SPARK-10929 Tungsten fails to acquire memory writing to HDFS
- Resolved
- links to
(2 links to)