Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21935

Pyspark UDF causing ExecutorLostFailure

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Major
    • Resolution: Incomplete
    • Affects Version/s: 2.1.0
    • Fix Version/s: None
    • Component/s: PySpark

      Description

      Hi,

      I'm using spark 2.1.0 on AWS EMR (Yarn) and trying to use a UDF in python as follows:

      from pyspark.sql.functions import col, udf
      from pyspark.sql.types import StringType
      
      path = 's3://some/parquet/dir/myfile.parquet'
      df = spark.read.load(path)
      def _test_udf(useragent):
          return useragent.upper()
      
      test_udf = udf(_test_udf, StringType())
      df = df.withColumn('test_field', test_udf(col('std_useragent')))
      df.write.parquet('/output.parquet')
      

      The following config is used in spark-defaults.conf (using maximizeResourceAllocation in EMR)

      ...
      spark.executor.instances         4
      spark.executor.cores             8
      spark.driver.memory              8G
      spark.executor.memory            9658M
      spark.default.parallelism        64
      spark.driver.maxResultSize       3G
      ...
      

      The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU, 15 GiB memory, 160 SSD GB storage

      The above example fails every single time with errors like the following:

      17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50, ip-172-31-7-125.eu-west-1.compute.internal, executor 10): ExecutorLostFailure (executor 10 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      

      I tried to increase the spark.yarn.executor.memoryOverhead to 3000 which delays the errors but eventually I get them before the end of the job. The job eventually fails.

      If I run the above job in scala everything works as expected (without having to adjust the memoryOverhead)

      import org.apache.spark.sql.functions.udf
      
      val upper: String => String = _.toUpperCase
      val df = spark.read.load("s3://some/parquet/dir/myfile.parquet")
      val upperUDF = udf(upper)
      val newdf = df.withColumn("test_field", upperUDF(col("std_useragent")))
      newdf.write.parquet("/output.parquet")
      

      Cpu utilisation is very bad with pyspark

      Is this a known bug with pyspark and udfs or is it a matter of bad configuration?
      Looking forward to suggestions. Thanks!

        Attachments

        1. Screen Shot 2017-09-06 at 11.31.13.png
          38 kB
          Nikolaos Tsipas
        2. Screen Shot 2017-09-06 at 11.31.31.png
          61 kB
          Nikolaos Tsipas
        3. Screen Shot 2017-09-06 at 11.30.28.png
          311 kB
          Nikolaos Tsipas
        4. cpu.png
          363 kB
          Nikolaos Tsipas

          Activity

            People

            • Assignee:
              Unassigned
              Reporter:
              nicktgr15 Nikolaos Tsipas
            • Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: