Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-15403

LinearRegressionWithSGD fails on files more than 12Mb data

    XMLWordPrintableJSON

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Blocker
    • Resolution: Duplicate
    • Affects Version/s: 1.6.1
    • Fix Version/s: None
    • Component/s: MLlib
    • Labels:
      None
    • Environment:

      Ubuntu 14.04 with 8 Gb Ram, scala 2.11.7 with following memory settings for my project: JAVA_OPTS="-Xmx8G -Xms2G" .

    • Flags:
      Important

      Description

      I parse my json-like data, passing by DataFrame and SparkSql facilities and then scale one numerical feature and create dummy variables for categorical features. So far from initial 14 keys of my json-like file I get about 200-240 features in the final LabeledPoint. The final data is sparse and every file contains as minimum 50000 of observations. I try to run two types of algorithms on data : LinearRegressionWithSGD or LassoWithSGD, since the data is sparse and regularization might be required. For data larger than 11MB LinearRegressionWithSGD fails with the following error:

      org.apache.spark.SparkException: Job aborted due to stage failure: Task 58 in stage 346.0 failed 1 times, most recent failure: Lost task 58.0 in stage 346.0 (TID 18140, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 179307 ms

      I tried to reproduce this bug with bug in smaller example, and I suppose that something wrong could be with LinearRegressionWithSGD on large sets of data. I notices that while using StandardScaler on preprocessing step and counts on Linear Regression step, collect() method is perform, that can cause the bug. So the possibility to scale Linear regression is questioned, cause, as I far as I understand it, collect() performs on driver and so the sens of scaled calculations is lost.

      import java.io.{File}
      
      import org.apache.spark.mllib.linalg.{Vectors}
      import org.apache.spark.mllib.regression.{LabeledPoint, LassoWithSGD}
      import org.apache.spark.rdd.RDD
      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.{SQLContext}
      
      import scala.language.postfixOps
      
      
      object Main2 {
      
        def main(args: Array[String]): Unit = {
      
      
          // Spark configuration is defined for execution on local computer, 4 cores 8Mb Ram
          val conf = new SparkConf()
            .setMaster(s"local[*]")
            .setAppName("spark_linear_regression_bug_report")
            //multiple configurations were tried for driver/executor memories, including default configurations
            .set("spark.driver.memory", "3g")
            .set("spark.executor.memory", "3g")
            .set("spark.executor.heartbeatInterval", "30s")
      
          // Spark context and SQL context definitions
          val sc = new SparkContext(conf)
          val sqlContext = new SQLContext(sc)
      
          val countFeatures = 500
          val countList = 500000
      
          val features = sc.broadcast(1 to countFeatures)
      
          val rdd: RDD[LabeledPoint] = sc.range(1, countList).map { i =>
            LabeledPoint(
              label = i.toDouble,
              features = Vectors.dense(features.value.map(_ => scala.util.Random.nextInt(2).toDouble).toArray)
            )
          }.persist()
      
          val numIterations = 1000
          val stepSize = 0.3
          val algorithm = new LassoWithSGD() //LassoWithSGD() 
          algorithm.setIntercept(true)
          algorithm.optimizer
            .setNumIterations(numIterations)
            .setStepSize(stepSize)
          val model = algorithm.run(rdd)
      
        }
      }
      

      the complete Error of the bug :

      [info] Running Main
      WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      WARN org.apache.spark.util.Utils - Your hostname, julien-ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.0.49 instead (on interface wlan0)
      WARN org.apache.spark.util.Utils - Set SPARK_LOCAL_IP if you need to bind to another address
      INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
      INFO Remoting - Starting remoting
      INFO Remoting - Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.0.49:59897]
      INFO org.spark-project.jetty.server.Server - jetty-8.y.z-SNAPSHOT
      INFO org.spark-project.jetty.server.AbstractConnector - Started SelectChannelConnector@0.0.0.0:4040
      WARN com.github.fommil.netlib.BLAS - Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
      WARN com.github.fommil.netlib.BLAS - Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
      [Stage 51:===========================================> (3 + 1) / 4]ERROR org.apache.spark.util.Utils - Uncaught exception in thread driver-heartbeater
      java.io.IOException: java.lang.ClassNotFoundException: org.apache.spark.storage.BroadcastBlockId
      at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) ~[na:na]
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_91]
      at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
      at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_91]
      at org.apache.spark.util.Utils$.deserialize(Utils.scala:92) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:437) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:427) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at scala.Option.foreach(Option.scala:257) ~[scala-library-2.11.7.jar:1.0.0-M1]
      at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:427) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:425) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at scala.collection.Iterator$class.foreach(Iterator.scala:742) ~[scala-library-2.11.7.jar:1.0.0-M1]
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) ~[scala-library-2.11.7.jar:1.0.0-M1]
      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[scala-library-2.11.7.jar:1.0.0-M1]
      at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[scala-library-2.11.7.jar:1.0.0-M1]
      at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:425) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:470) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:470) [spark-core_2.11-1.6.1.jar:1.6.1]
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_91]
      at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_91]
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_91]
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_91]
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_91]
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_91]
      at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]
      Caused by: java.lang.ClassNotFoundException: org.apache.spark.storage.BroadcastBlockId
      at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_91]
      at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_91]
      at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_91]
      at java.lang.Class.forName0(Native Method) ~[na:1.8.0_91]
      at java.lang.Class.forName(Class.java:348) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:628) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_91]
      at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479) ~[scala-library-2.11.7.jar:1.0.0-M1]
      at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) ~[na:na]
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_91]
      at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
      at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) ~[na:1.8.0_91]
      at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) ~[na:1.8.0_91]
      at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      ... 32 common frames omitted
      WARN org.apache.spark.HeartbeatReceiver - Removing executor driver with no recent heartbeats: 175339 ms exceeds timeout 120000 ms
      ERROR org.apache.spark.scheduler.TaskSchedulerImpl - Lost executor driver on localhost: Executor heartbeat timed out after 175339 ms
      WARN org.apache.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 105.0 (TID 420, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 175339 ms
      ERROR org.apache.spark.scheduler.TaskSetManager - Task 1 in stage 105.0 failed 1 times; aborting job
      WARN org.apache.spark.SparkContext - Killing executors is only supported in coarse-grained mode
      [error] (run-main-0) org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 105.0 failed 1 times, most recent failure: Lost task 1.0 in stage 105.0 (TID 420, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 175339 ms
      [error] Driver stacktrace:
      org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 105.0 failed 1 times, most recent failure: Lost task 1.0 in stage 105.0 (TID 420, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 175339 ms
      Driver stacktrace:
      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
      at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
      at scala.Option.foreach(Option.scala:257)
      at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
      at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
      at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
      at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
      at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
      at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
      at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1150)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
      at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1127)
      at org.apache.spark.mllib.optimization.GradientDescent$.runMiniBatchSGD(GradientDescent.scala:227)
      at org.apache.spark.mllib.optimization.GradientDescent.optimize(GradientDescent.scala:128)
      at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:308)
      at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:229)
      at Main$.main(Main.scala:85)
      at Main.main(Main.scala)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:498)
      [trace] Stack trace suppressed: run last compile:run for the full output.
      ERROR org.apache.spark.ContextCleaner - Error in cleaning thread
      java.lang.InterruptedException: null
      at java.lang.Object.wait(Native Method) ~[na:1.8.0_91]
      at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) ~[na:1.8.0_91]
      at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:176) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) [spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) [spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) [spark-core_2.11-1.6.1.jar:1.6.1]
      ERROR org.apache.spark.util.Utils - uncaught error in thread SparkListenerBus, stopping SparkContext
      java.lang.InterruptedException: null
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) ~[na:1.8.0_91]
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) ~[na:1.8.0_91]
      at java.util.concurrent.Semaphore.acquire(Semaphore.java:312) ~[na:1.8.0_91]
      at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:66) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[scala-library-2.11.7.jar:1.0.0-M1]
      at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) ~[spark-core_2.11-1.6.1.jar:1.6.1]
      at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63) [spark-core_2.11-1.6.1.jar:1.6.1]
      java.lang.RuntimeException: Nonzero exit code: 1
      at scala.sys.package$.error(package.scala:27)
      [trace] Stack trace suppressed: run last compile:run for the full output.
      [error] (compile:run) Nonzero exit code: 1

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Unassigned
                Reporter:
                rain_dev Ana La
              • Votes:
                1 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: