Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26116

Spark SQL - Sort when writing partitioned parquet leads to OOM errors

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.1
    • None
    • SQL

    Description

      When writing partitioned parquet using partitionBy, it looks like Spark sorts each partition before writing but this sort consumes a huge amount of memory compared to the size of the data. The executors can then go OOM and get killed by YARN. As a consequence, it also forces to provision huge amounts of memory compared to the data to be written.

      Error messages found in the Spark UI are like the following :

      Spark UI description of failure : Job aborted due to stage failure: Task 169 in stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 (TID 98, xxxxxxxxx.xxxxxx.xxxxx.xx, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      

       

      Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxxxxxx.xxxxx.xxxxx.xx, executor 1): org.apache.spark.SparkException: Task failed while writing rows
      
               at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
      
               at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
      
               at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
      
               at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      
               at org.apache.spark.scheduler.Task.run(Task.scala:99)
      
               at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
      
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      
               at java.lang.Thread.run(Thread.java:745)
      
      Caused by: java.lang.OutOfMemoryError: error while calling spill() on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : /app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad (No such file or directory)
      
               at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)
      
               at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)
      
               at org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)
      
               at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)
      
               at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)
      
               at org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)
      
               at org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)
      
               at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
      
               at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
      
               at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)
      
               at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
      
               ... 8 more

       
      In the stderr logs, we can see that huge amount of sort data (the partition being sorted here is 250 MB when persisted into memory, deserialized) is being spilled to the disk (INFO UnsafeExternalSorter: Thread 155 spilling sort data of 3.6 GB to disk). Sometimes the data is spilled in time to the disk and the sort completes (INFO FileFormatWriter: Sorting complete. Writing out partition files one at a time.) but sometimes it does not and we see multiple TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again. until the application finally runs OOM with logs such as ERROR UnsafeExternalSorter: Unable to grow the pointer array. Even when it works, GC time is pretty high (~20% of the total write task duration) and I guess that these disk spills further impair performance.

      Contrary to what the above error message suggests, increasing off-heap memory (to 4, 8 and then 16 GiB) through either spark.yarn.executor.memoryOverhead or XX:MaxDirectMemorySize does not solve the problem. Moreover, the stdout of the crashed executor mentions java.lang.OutOfMemoryError: Java heap space.

      For information, here the schema of the data I am trying to write as partitioned parquet files. String type columns contain short strings (max 12 characters) :

      root
       |-- data_col1: timestamp (nullable = true)
       |-- data_col2: timestamp (nullable = true)
       |-- data_col3: string (nullable = true)
       |-- data_col4: integer (nullable = true)
       |-- data_col5: integer (nullable = true)
       |-- data_col6: string (nullable = true)
       |-- data_col7 (nullable = true)
       |-- data_col8: timestamp (nullable = true)
       |-- data_col9: integer (nullable = true)
       |-- partition_col1: string (nullable = true)
       |-- partition_col2: string (nullable = true)
       |-- partition_col3: integer (nullable = true)
       |-- partition_col4: integer (nullable = true)
       |-- partition_col5: integer (nullable = true)
      

      I should mention that when looking at individual (successful) write tasks in the Spark UI, the Peak Execution Memory metric is always 0.  

      It looks like a known issue : SPARK-12546 is explicitly related and led to a PR that decreased spark.sql.sources.maxConcurrentWrites default value from 5 to 1. Spark 1.6.0 release notes also mentions this problem as a “Know Issue” and as described in SPARK-12546, advise to tweak both spark.memory.fraction and spark.hadoop.parquet.memory.pool.ratio without any explanation regarding how this should help (and the recommended values help indeed).

      Could we at least enhance the documentation on this issue? I would be really helpful for me to understand what is happening in terms of memory so that I can better size my application and/or choose the most appropriate memory parameters. Still, how does it come that the sort generates that much data ?

      I am running Spark 2.1.1 and do not know whether I would encounter this issue in later versions.

      Many thanks,

      Pierre LIENHART

      Attachments

        Activity

          People

            Unassigned Unassigned
            plnhrt Pierre Lienhart
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: