Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-10474

TungstenAggregation cannot acquire memory for pointer array after switching to sort-based

Log workAgile BoardRank to TopRank to BottomAttach filesAttach ScreenshotBulk Copy AttachmentsBulk Move AttachmentsVotersWatch issueWatchersCreate sub-taskConvert to sub-taskMoveLinkCloneLabelsUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete CommentsDelete
    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Blocker
    • Resolution: Fixed
    • 1.5.0
    • 1.5.1, 1.6.0
    • SQL
    • None

    Description

      In aggregation case, a Lost task happened with below error.

       java.io.IOException: Could not acquire 65536 bytes of memory
              at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
              at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
              at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
              at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:379)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
              at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:119)
              at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:64)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
              at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
              at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
              at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
              at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
              at org.apache.spark.scheduler.Task.run(Task.scala:88)
              at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
              at java.lang.Thread.run(Thread.java:745)
      

      Key SQL Query

      INSERT INTO TABLE test_table
      SELECT
        ss.ss_customer_sk AS cid,
        count(CASE WHEN i.i_class_id=1  THEN 1 ELSE NULL END) AS id1,
        count(CASE WHEN i.i_class_id=3  THEN 1 ELSE NULL END) AS id3,
        count(CASE WHEN i.i_class_id=5  THEN 1 ELSE NULL END) AS id5,
        count(CASE WHEN i.i_class_id=7  THEN 1 ELSE NULL END) AS id7,
        count(CASE WHEN i.i_class_id=9  THEN 1 ELSE NULL END) AS id9,
        count(CASE WHEN i.i_class_id=11 THEN 1 ELSE NULL END) AS id11,
        count(CASE WHEN i.i_class_id=13 THEN 1 ELSE NULL END) AS id13,
        count(CASE WHEN i.i_class_id=15 THEN 1 ELSE NULL END) AS id15,
        count(CASE WHEN i.i_class_id=2  THEN 1 ELSE NULL END) AS id2,
        count(CASE WHEN i.i_class_id=4  THEN 1 ELSE NULL END) AS id4,
        count(CASE WHEN i.i_class_id=6  THEN 1 ELSE NULL END) AS id6,
        count(CASE WHEN i.i_class_id=8  THEN 1 ELSE NULL END) AS id8,
        count(CASE WHEN i.i_class_id=10 THEN 1 ELSE NULL END) AS id10,
        count(CASE WHEN i.i_class_id=14 THEN 1 ELSE NULL END) AS id14,
        count(CASE WHEN i.i_class_id=16 THEN 1 ELSE NULL END) AS id16
      FROM store_sales ss
      INNER JOIN item i ON ss.ss_item_sk = i.i_item_sk
      WHERE i.i_category IN ('Books')
      AND ss.ss_customer_sk IS NOT NULL
      GROUP BY ss.ss_customer_sk
      HAVING count(ss.ss_item_sk) > 5
      

      Note:
      the store_sales is a big fact table and item is a small dimension table.

      Attachments

        Issue Links

        Activity

          This comment will be Viewable by All Users Viewable by All Users
          Cancel

          People

            andrewor14 Andrew Or Assign to me
            jameszhouyi Yi Zhou
            Votes:
            0 Vote for this issue
            Watchers:
            18 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved:

              Slack

                Issue deployment