Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-23246

(Py)Spark OOM because of iteratively accumulated metadata that cannot be cleared

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Not A Problem
    • 2.2.1
    • None
    • PySpark, Spark Core, SQL
    • None

    Description

      I am having consistent OOM crashes when trying to use PySpark for iterative algorithms in which I create new DataFrames per iteration (e.g. by sampling from a "mother" DataFrame), do something with such DataFrames, and never need such DataFrames ever in future iterations.

      The below script simulates such OOM failures. Even when one tries explicitly .unpersist() the temporary DataFrames (by using the --unpersist flag below) and/or deleting and garbage-collecting the Python objects (by using the --py-gc flag below), the Java objects seem to stay on and accumulate until they exceed the JVM/driver memory.

      The more complex the temporary DataFrames in each iteration (illustrated by the --n-partitions flag below), the faster OOM occurs.

      The typical error messages include:

      • "java.lang.OutOfMemoryError : GC overhead limit exceeded"
      • "Java heap space"
      • "ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=6053742323219781
        161, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to /<IP ADDR>; closing connection"

      Please suggest how I may overcome this so that we can have long-running iterative programs using Spark that uses resources only up to a bounded, controllable limit.

       

      from __future__ import print_function
      
      import argparse
      import gc
      import pandas
      
      import pyspark
      
      
      arg_parser = argparse.ArgumentParser()
      arg_parser.add_argument('--unpersist', action='store_true')
      arg_parser.add_argument('--py-gc', action='store_true')
      arg_parser.add_argument('--n-partitions', type=int, default=1000)
      args = arg_parser.parse_args()
      
      
      # create SparkSession (*** set spark.driver.memory to 512m in spark-defaults.conf ***)
      spark = pyspark.sql.SparkSession.builder \
          .config('spark.executor.instances', 2) \
          .config('spark.executor.cores', 2) \
          .config('spark.executor.memory', '512m') \
          .config('spark.ui.enabled', False) \
          .config('spark.ui.retainedJobs', 10) \
          .config('spark.ui.retainedStages', 10) \
          .config('spark.ui.retainedTasks', 10) \
          .enableHiveSupport() \
          .getOrCreate()
      
      
      # create Parquet file for subsequent repeated loading
      df = spark.createDataFrame(
          pandas.DataFrame(
              dict(
                  row=range(args.n_partitions),
                  x=args.n_partitions * [0]
              )
          )
      )
      
      parquet_path = '/tmp/TestOOM-{}Partitions.parquet'.format(args.n_partitions)
      
      df.write.parquet(
          path=parquet_path,
          partitionBy='row',
          mode='overwrite'
      )
      
      
      i = 0
      
      
      # the below loop simulates an iterative algorithm that creates new DataFrames in each iteration (e.g. sampling from a "mother" DataFrame), do something, and never need those DataFrames again in future iterations
      # we are having a problem cleaning up the built-up metadata
      # hence the program will crash after while because of OOM
      while True:
          _df = spark.read.parquet(parquet_path)
      
          if args.unpersist:
              _df.unpersist()
      
          if args.py_gc:
              del _df
              gc.collect()
      
          i += 1; print('COMPLETED READ ITERATION #{}\n'.format(i))
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            MBALearnsToCode V Luong
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: