Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10474

TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Closed
    • Priority: Blocker
    • Resolution: Fixed
    • Affects Version/s: 1.5.0
    • Fix Version/s: 1.5.1, 1.6.0
    • Component/s: SQL
    • Labels:
      None

      Description

      In aggregation case, a Lost task happened with below error.

       java.io.IOException: Could not acquire 65536 bytes of memory
              at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
              at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
              at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
              at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
              at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
              at org.apache.spark.scheduler.Task.run(Task.scala:88)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
      

      Key SQL Query

      INSERT INTO TABLE test_table
      SELECT
        ss.ss_customer_sk AS cid,
        count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,
        count(CASE WHEN i.i_class_id=3  THEN 1 ELSE NULL END) AS id3,
        count(CASE WHEN i.i_class_id=5  THEN 1 ELSE NULL END) AS id5,
        count(CASE WHEN i.i_class_id=7  THEN 1 ELSE NULL END) AS id7,
        count(CASE WHEN i.i_class_id=9  THEN 1 ELSE NULL END) AS id9,
        count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11,
        count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13,
        count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15,
        count(CASE WHEN i.i_class_id=2  THEN 1 ELSE NULL END) AS id2,
        count(CASE WHEN i.i_class_id=4  THEN 1 ELSE NULL END) AS id4,
        count(CASE WHEN i.i_class_id=6  THEN 1 ELSE NULL END) AS id6,
        count(CASE WHEN i.i_class_id=8  THEN 1 ELSE NULL END) AS id8,
        count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10,
        count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14,
        count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16
      FROM store_sales ss
      INNER JOIN item i ON ss.ss_item_sk = i.i_item_sk
      WHERE i.i_category IN ('Books')
      AND ss.ss_customer_sk IS NOT NULL
      GROUP BY ss.ss_customer_sk
      HAVING count(ss.ss_item_sk) > 5
      

      Note:
      the store_sales is a big fact table and item is a small dimension table.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                andrewor14 Andrew Or
                Reporter:
                jameszhouyi Yi Zhou
              • Votes:
                0 Vote for this issue
                Watchers:
                18 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: