Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26320

udf with multiple arrays as input

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.1.1, 2.3.1
    • None
    • SQL
    • None

    Description

      Spark get GC out of memory error when passing many arrays when we use many arrays input

      Description :

      1. 3 different Input arrays
      2. Every array has size 10

       

      UDF :

      val getResult = udf(
       (index: Integer, array1: Seq[Integer], array2: Seq[Integer], array3: Seq[Double]) => 0
         doSomeThing(index, array1, array2, array3)
      )
      

      DataFrame Schema :

      root
      |-- frequency_1: integer (nullable = true)
      |-- code_1: integer (nullable = true)
      |-- power_1: integer (nullable = false)
      |-- frequency_2: integer (nullable = true)
      |-- code_2: integer (nullable = true)
      |-- power_2: integer (nullable = false)
      |-- frequency_3: integer (nullable = true)
      |-- code_3: integer (nullable = true)
      |-- power_3: integer (nullable = false)
      |-- frequency_4: integer (nullable = true)
      |-- code_4: integer (nullable = true)
      |-- power_4: integer (nullable = false)
      |-- frequency_5: integer (nullable = true)
      |-- code_5: integer (nullable = true)
      |-- power_5: integer (nullable = false)
      |-- frequency_6: integer (nullable = true)
      |-- code_6: integer (nullable = true)
      |-- power_6: integer (nullable = false)
      |-- frequency_7: integer (nullable = true)
      |-- code_7: integer (nullable = true)
      |-- power_7: double (nullable = true)
      |-- frequency_8: integer (nullable = true)
      |-- code_8: integer (nullable = true)
      |-- power_8: double (nullable = true)
      |-- frequency_9: integer (nullable = true)
      |-- code_9: integer (nullable = true)
      |-- power_9: double (nullable = true)
      |-- frequency_10: integer (nullable = true)
      |-- code_10: integer (nullable = true)
      
      |-- power_10: double (nullable = true)
      

      Call the UDF to enrich dataframe using withColumn function 10 times:

       

      df.withColumn("out1", getResult(0, 
         array(col("frequency_1"), col("frequency_2), ....,col("frequency_10)), 
         array(col("code_1"), col("code_2), ....,col("code_10)), 
         array(col("power_1"), col("power_2), ....,col("power_10)))
      .withColumn("out2", getResult(0, 
      array(col("frequency_1"), col("frequency_2), ....,col("frequency_10)),    
      array(col("code_1"), col("code_2), ....,col("code_10)),    
      array(col("power_1"), col("power_2), ....,col("power_10)))
       
      .withColumn("out9", getResult(9,....
       
      .withColumn("out10", getResult(10, 
      array(col("frequency_1"), col("frequency_2), ....,col("frequency_10)),    array(col("code_1"), col("code_2), ....,col("code_10)),    
      array(col("power_1"), col("power_2), ....,col("power_10)))
      

       

       

      Error Log :

      12:56:08.461 [dispatcher-event-loop-3] ERROR o.a.s.scheduler.TaskSchedulerImpl - Lost executor driver on localhost: Executor heartbeat timed out after 150014 ms
      [info] com.xxx.xx.xx.xx.xx.xx.xx.xx.CSVExporterSpec *** ABORTED *** (9 minutes, 24 seconds)
      [info] java.lang.OutOfMemoryError: GC overhead limit exceeded
      [info] ...
      [error] Uncaught exception when running com.xxx.xx.xx.xx.xx.xx.xx.xx.CSVExporterSpec: java.lang.OutOfMemoryError: GC overhead limit exceeded
      java.lang.OutOfMemoryError: GC overhead limit exceeded
       at java.util.AbstractList.iterator(AbstractList.java:288)
       at org.apache.cassandra.gms.Gossiper.addLocalApplicationStates(Gossiper.java:1513)
       at org.apache.cassandra.gms.Gossiper.addLocalApplicationState(Gossiper.java:1505)
       at org.apache.cassandra.service.LoadBroadcaster$1.run(LoadBroadcaster.java:92)
       at org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor$UncomplainingRunnable.run(DebuggableScheduledThreadPoolExecutor.java:118)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at org.apache.cassandra.concurrent.NamedThreadFactory.lambda$threadLocalDeallocator$0(NamedThreadFactory.java:81)
       at org.apache.cassandra.concurrent.NamedThreadFactory$$Lambda$4/821749187.run(Unknown Source)
       at java.lang.Thread.run(Thread.java:745)
      12:56:13.552 [ScheduledTasks:1] ERROR o.a.c.utils.JVMStabilityInspector - JVM state determined to be unstable. Exiting forcefully due to:
      java.lang.OutOfMemoryError: GC overhead limit exceeded
       at java.util.AbstractList.iterator(AbstractList.java:288)
       at org.apache.cassandra.gms.Gossiper.addLocalApplicationStates(Gossiper.java:1513)
       at org.apache.cassandra.gms.Gossiper.addLocalApplicationState(Gossiper.java:1505)
       at org.apache.cassandra.service.LoadBroadcaster$1.run(LoadBroadcaster.java:92)
       at org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor$UncomplainingRunnable.run(DebuggableScheduledThreadPoolExecutor.java:118)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at org.apache.cassandra.concurrent.NamedThreadFactory.lambda$threadLocalDeallocator$0(NamedThreadFactory.java:81)
       at org.apache.cassandra.concurrent.NamedThreadFactory$$Lambda$4/821749187.run(Unknown Source)
       at java.lang.Thread.run(Thread.java:745)

       

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            rwibowo Rio Wibowo
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: