Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-17788

RangePartitioner results in few very large tasks and many small to empty tasks

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.0.0
    • 2.3.0
    • Spark Core, SQL
    • None
    • Ubuntu 14.04 64bit
      Java 1.8.0_101

    Description

      Greetings everyone,

      I was trying to read a single field of a Hive table stored as Parquet in Spark (~140GB for the entire table, this single field is a Double, ~1.4B records) and look at the sorted output using the following:
      sql("SELECT " + field + " FROM MY_TABLE ORDER BY " + field + " DESC")
      ​But this simple line of code gives:
      Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes

      Same error for:
      sql("SELECT " + field + " FROM MY_TABLE).sort(field)
      and:
      sql("SELECT " + field + " FROM MY_TABLE).orderBy(field)

      After doing some searching, the issue seems to lie in the RangePartitioner trying to create equal ranges. [1]

      [1] https://spark.apache.org/docs/2.0.0/api/java/org/apache/spark/RangePartitioner.html

      The Double values I'm trying to sort are mostly in the range [0,1] (~70% of the data which roughly equates 1 billion records), other numbers in the dataset are as high as 2000. With the RangePartitioner trying to create equal ranges, some tasks are becoming almost empty while others are extremely large, due to the heavily skewed distribution.

      This is either a bug in Apache Spark or a major limitation of the framework. I hope one of the devs can help solve this issue.

      P.S. Email thread on Spark user mailing list:
      http://mail-archives.apache.org/mod_mbox/spark-user/201610.mbox/%3CCA%2B_of14hTVYTUHXC%3DmS9Kqd6qegVvkoF-ry3Yj2%2BRT%2BWSBNzhg%40mail.gmail.com%3E

      Attachments

        Issue Links

          Activity

            cloud_fan Wenchen Fan added a comment -

            can you provide the full stacktrace? thanks!

            cloud_fan Wenchen Fan added a comment - can you provide the full stacktrace? thanks!
            babak.alipour@gmail.com Babak Alipour added a comment -

            The details were in the email thread.
            Here's the full stack trace:

            Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes
            at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
            at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121)
            at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
            at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
            at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
            at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
            at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
            at org.apache.spark.scheduler.Task.run(Task.scala:85)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)

            babak.alipour@gmail.com Babak Alipour added a comment - The details were in the email thread. Here's the full stack trace: Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.sort_addToSorter$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
            holden Holden Karau added a comment -

            This is semi-expected behaviour of the range partitioner (and really all Spark partitioners) don't support creating a split on the same key (e.g. 70% of your data has the same key and you are partitioning on that key 70% of that day is going to end up in the same partition).

            We could try and fix this in a few ways - either by having Spark SQL do something special in this case or having Spark's sortBy automatically add "noise" to the key when the sampling indicates there is too much data for a given key or allowing partitioners to be non-determinstic and updating the general sortBy logic in Spark.

            I think this would be something good for us to consider - but it's probably going to take awhile (and certainly not in time for 2.1.0).

            holden Holden Karau added a comment - This is semi-expected behaviour of the range partitioner (and really all Spark partitioners) don't support creating a split on the same key (e.g. 70% of your data has the same key and you are partitioning on that key 70% of that day is going to end up in the same partition). We could try and fix this in a few ways - either by having Spark SQL do something special in this case or having Spark's sortBy automatically add "noise" to the key when the sampling indicates there is too much data for a given key or allowing partitioners to be non-determinstic and updating the general sortBy logic in Spark. I think this would be something good for us to consider - but it's probably going to take awhile (and certainly not in time for 2.1.0).
            hvanhovell Herman van Hövell added a comment - - edited

            Spark makes a sketch of your data as soon when you want to order the entire dataset. Based on that sketch Spark tries to create equally sized partitions. As holdenk said, your problem is caused by skew (a lot of rows with the same key), and none of the current partitioning schemes can help you with this. On the short run, you could follow her suggestion and add noise to the order (this only works for global ordering and not for joins/aggregation with skewed values). On the long run, there is an ongoing effort to reduce skew for joining, see SPARK-9862 for more information.

            I have created the follow little spark program to illustrate how range partitioning works:

            import org.apache.spark.sql.Row
            
            // Set the partitions and parallelism to relatively low value so we can read the results.
            spark.conf.set("spark.default.parallelism", "20")
            spark.conf.set("spark.sql.shuffle.partitions", "20")
            
            // Create a skewed data frame.
            val df = spark
              .range(10000000)
              .select(
                $"id",
                (rand(34) * when($"id" % 10 <= 7, lit(1.0)).otherwise(lit(10.0))).as("value"))
            
            // Make a summary per partition. The partition intervals should not overlap and the number of
            // elements in a partition should roughly be the same for all partitions.
            case class PartitionSummary(count: Long, min: Double, max: Double, range: Double)
            val res = df.orderBy($"value").mapPartitions { iterator =>
              val (count, min, max) = iterator.foldLeft((0L, Double.PositiveInfinity, Double.NegativeInfinity)) {
                case ((count, min, max), Row(_, value: Double)) =>
                  (count + 1L, Math.min(min, value), Math.max(max, value))
              }
              Iterator.single(PartitionSummary(count, min, max, max - min))
            }
            
            // Get results and make them look nice
            res.orderBy($"min")
              .select($"count", $"min".cast("decimal(5,3)"), $"max".cast("decimal(5,3)"), $"range".cast("decimal(5,3)"))
              .show(30)
            

            This yields the following results (notice how the partition range varies and the row count is relatively similar):

            +------+-----+------+-----+                                                     
            | count|  min|   max|range|
            +------+-----+------+-----+
            |484005|0.000| 0.059|0.059|
            |426212|0.059| 0.111|0.052|
            |381796|0.111| 0.157|0.047|
            |519954|0.157| 0.221|0.063|
            |496842|0.221| 0.281|0.061|
            |539082|0.281| 0.347|0.066|
            |516798|0.347| 0.410|0.063|
            |558487|0.410| 0.478|0.068|
            |419825|0.478| 0.529|0.051|
            |402257|0.529| 0.578|0.049|
            |557225|0.578| 0.646|0.068|
            |518626|0.646| 0.710|0.063|
            |611478|0.710| 0.784|0.075|
            |544556|0.784| 0.851|0.066|
            |454356|0.851| 0.906|0.055|
            |450535|0.906| 0.961|0.055|
            |575996|0.961| 2.290|1.329|
            |525915|2.290| 4.920|2.630|
            |518757|4.920| 7.510|2.590|
            |497298|7.510|10.000|2.490|
            +------+-----+------+-----+
            
            hvanhovell Herman van Hövell added a comment - - edited Spark makes a sketch of your data as soon when you want to order the entire dataset. Based on that sketch Spark tries to create equally sized partitions. As holdenk said, your problem is caused by skew (a lot of rows with the same key), and none of the current partitioning schemes can help you with this. On the short run, you could follow her suggestion and add noise to the order (this only works for global ordering and not for joins/aggregation with skewed values). On the long run, there is an ongoing effort to reduce skew for joining, see SPARK-9862 for more information. I have created the follow little spark program to illustrate how range partitioning works: import org.apache.spark.sql.Row // Set the partitions and parallelism to relatively low value so we can read the results. spark.conf.set("spark.default.parallelism", "20") spark.conf.set("spark.sql.shuffle.partitions", "20") // Create a skewed data frame. val df = spark .range(10000000) .select( $"id", (rand(34) * when($"id" % 10 <= 7, lit(1.0)).otherwise(lit(10.0))).as("value")) // Make a summary per partition. The partition intervals should not overlap and the number of // elements in a partition should roughly be the same for all partitions. case class PartitionSummary(count: Long, min: Double, max: Double, range: Double) val res = df.orderBy($"value").mapPartitions { iterator => val (count, min, max) = iterator.foldLeft((0L, Double.PositiveInfinity, Double.NegativeInfinity)) { case ((count, min, max), Row(_, value: Double)) => (count + 1L, Math.min(min, value), Math.max(max, value)) } Iterator.single(PartitionSummary(count, min, max, max - min)) } // Get results and make them look nice res.orderBy($"min") .select($"count", $"min".cast("decimal(5,3)"), $"max".cast("decimal(5,3)"), $"range".cast("decimal(5,3)")) .show(30) This yields the following results (notice how the partition range varies and the row count is relatively similar): +------+-----+------+-----+ | count| min| max|range| +------+-----+------+-----+ |484005|0.000| 0.059|0.059| |426212|0.059| 0.111|0.052| |381796|0.111| 0.157|0.047| |519954|0.157| 0.221|0.063| |496842|0.221| 0.281|0.061| |539082|0.281| 0.347|0.066| |516798|0.347| 0.410|0.063| |558487|0.410| 0.478|0.068| |419825|0.478| 0.529|0.051| |402257|0.529| 0.578|0.049| |557225|0.578| 0.646|0.068| |518626|0.646| 0.710|0.063| |611478|0.710| 0.784|0.075| |544556|0.784| 0.851|0.066| |454356|0.851| 0.906|0.055| |450535|0.906| 0.961|0.055| |575996|0.961| 2.290|1.329| |525915|2.290| 4.920|2.630| |518757|4.920| 7.510|2.590| |497298|7.510|10.000|2.490| +------+-----+------+-----+

            I am closing this one as a duplicate. Feel free to reopen if you disagree.

            hvanhovell Herman van Hövell added a comment - I am closing this one as a duplicate. Feel free to reopen if you disagree.
            holden Holden Karau added a comment -

            I don't think this is a duplicate - its related but a join doesn't necessarily use a range partitioner and sortBy is a different operation. I agree the potential solution could share a lot the same underlying implementation.

            holden Holden Karau added a comment - I don't think this is a duplicate - its related but a join doesn't necessarily use a range partitioner and sortBy is a different operation. I agree the potential solution could share a lot the same underlying implementation.
            holden Holden Karau added a comment -

            This is somewhat distinct from the join case, but certainly related.

            holden Holden Karau added a comment - This is somewhat distinct from the join case, but certainly related.
            hvanhovell Herman van Hövell added a comment - - edited

            That is fair. The solution is not that straightforward TBH:

            • Always add some kind of tie breaking value to the range. This could be random, but I'd rather add something like monotonically_increasing_id(). This always incurs some cost.
            • Only add a tie-breaker when the you have (suspect) skew. Here we need to add some heavy hitter algorithm, which is potentially much more resource intensive than reservoir sampling. The other thing is that when we suspect skew, we would need to scan the data again (which would make the total of scans 3).

            So I would be slightly in favor of option 1 and a flag to disable it.

            hvanhovell Herman van Hövell added a comment - - edited That is fair. The solution is not that straightforward TBH: Always add some kind of tie breaking value to the range. This could be random, but I'd rather add something like monotonically_increasing_id(). This always incurs some cost. Only add a tie-breaker when the you have (suspect) skew. Here we need to add some heavy hitter algorithm, which is potentially much more resource intensive than reservoir sampling. The other thing is that when we suspect skew, we would need to scan the data again (which would make the total of scans 3). So I would be slightly in favor of option 1 and a flag to disable it.
            cloud_fan Wenchen Fan added a comment -

            Should we investigate this?

            Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes
            

            Although some partitions can be very large, but Spark should be able to process it(slowly), instead of throwing exception.

            cloud_fan Wenchen Fan added a comment - Should we investigate this? Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes Although some partitions can be very large, but Spark should be able to process it(slowly), instead of throwing exception.
            cloud_fan Wenchen Fan added a comment -

            After looking at the code, it seems the only way to trigger this exception is setting `spark.buffer.pageSize` to a value larger than `((1L << 31) - 1) * 8L`, babak.alipour@gmail.com did you set this conf?

            cloud_fan Wenchen Fan added a comment - After looking at the code, it seems the only way to trigger this exception is setting `spark.buffer.pageSize` to a value larger than `((1L << 31) - 1) * 8L`, babak.alipour@gmail.com did you set this conf?
            babak.alipour@gmail.com Babak Alipour added a comment -

            No, I didn't change that conf. I did try to change `spark.executor.memory` to various values ranging from 8g to 64g; nothing changes and I get the same exception.

            babak.alipour@gmail.com Babak Alipour added a comment - No, I didn't change that conf. I did try to change `spark.executor.memory` to various values ranging from 8g to 64g; nothing changes and I get the same exception.

            babak.alipour@gmail.com A few questions:

            • Is it possible to get a reproducible piece of code?
            • Could you give us the value of the spark.buffer.pageSize configuration property? When we allocate the memory for a new record we try to allocate either the page size (which is a Long value) or the size of the record (which is an Int value). The size of the page is larger than the maximum integer value, so this implies the page size is set at a very high value.
            • I am also quite surprised why this is not spilling. Could you give us the value of the spark.shuffle.spill.numElementsForceSpillThreshold configuration property? What is the average row size?
            hvanhovell Herman van Hövell added a comment - babak.alipour@gmail.com A few questions: Is it possible to get a reproducible piece of code? Could you give us the value of the spark.buffer.pageSize configuration property? When we allocate the memory for a new record we try to allocate either the page size (which is a Long value) or the size of the record (which is an Int value). The size of the page is larger than the maximum integer value, so this implies the page size is set at a very high value. I am also quite surprised why this is not spilling. Could you give us the value of the spark.shuffle.spill.numElementsForceSpillThreshold configuration property? What is the average row size?
            apachespark Apache Spark added a comment -

            User 'cloud-fan' has created a pull request for this issue:
            https://github.com/apache/spark/pull/18251

            apachespark Apache Spark added a comment - User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/18251
            cloud_fan Wenchen Fan added a comment -

            Unfortunately I don't have a reproducible code snippet to prove it has been fixed, but I'm pretty confident my fix should work for it. cc babak.alipour@gmail.com please reopen this ticket if you still hit this issue, thanks!

            cloud_fan Wenchen Fan added a comment - Unfortunately I don't have a reproducible code snippet to prove it has been fixed, but I'm pretty confident my fix should work for it. cc babak.alipour@gmail.com please reopen this ticket if you still hit this issue, thanks!
            sesshomurai Darren Govoni added a comment - - edited

            I'm also running into this error on spark 2.1.0
            : org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 11.0 failed 4 times, most recent failure: Lost task 42.3 in stage 11.0 (TID 7544,xxx.xxx.xxx.xxx.xx, executor 2): java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes

            sesshomurai Darren Govoni added a comment - - edited I'm also running into this error on spark 2.1.0 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 42 in stage 11.0 failed 4 times, most recent failure: Lost task 42.3 in stage 11.0 (TID 7544,xxx.xxx.xxx.xxx.xx, executor 2): java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes

            People

              cloud_fan Wenchen Fan
              babak.alipour@gmail.com Babak Alipour
              Votes:
              0 Vote for this issue
              Watchers:
              9 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: