Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-7624

Task scheduler delay is increasing time over time in spark local mode

    XMLWordPrintableJSON

Details

    Description

      I am running a simple spark streaming program with spark 1.3.1 in local mode, it receives json string from a socket with rate 50 events per second, it can run well in first 6 hours (although the minor gc count per minute is increasing all the time), after that, i can see that the scheduler delay in every task is significant increased from 10 ms to 100 ms, after 10 hours running, the task delay is about 800 ms and cpu is also increased from 2% to 30%. This causes the steaming job can not finish in one batch interval (5 seconds). I dumped the java memory after 16 hours and can see there are about 200000 org.apache.spark.scheduler.local.ReviveOffers objects in akka.actor.LightArrayRevolverScheduler$TaskQueue[]. Then i checked the code and see only one place may put the ReviveOffers to akka LightArrayRevolverScheduler: the LocalActor::reviveOffers

       def reviveOffers() {
          val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
          val tasks = scheduler.resourceOffers(offers).flatten
          for (task <- tasks) {
            freeCores -= scheduler.CPUS_PER_TASK
            executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
              task.name, task.serializedTask)
          }
      
          if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
            // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
            context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers)
          }
      }
      

      I removed the last three lines in this method (the whole if block, which is introduced from https://issues.apache.org/jira/browse/SPARK-4939), it worked smooth after 20 hours running, the scheduler delay is about 10 ms all the time. So there should have some conditions that the ReviveOffers will be duplicate scheduled? I am not sure why this happens, but i feel that this is the root cause of this issue.

      My spark settings:

      1. Memor: 3G
      2. CPU: 8 cores
      3. Streaming Batch interval: 5 seconds.

      Here are my streaming code:

      val input = ssc.socketTextStream(
            hostname, port, StorageLevel.MEMORY_ONLY_SER).mapPartitions(
            /// parse the json to Order
            Order(_), preservePartitioning = true)
      val mresult = input.map(
            v => (v.customer, UserSpending(v.customer, v.count * v.price, v.timestamp.toLong))).cache()
      val tempr  = mresult.window(
                  Seconds(firstStageWindowSize), 
                  Seconds(firstStageWindowSize)
                ).transform(
                  rdd => rdd.union(rdd).union(rdd).union(rdd)
                )
      tempr.count.print
      tempr.cache().foreachRDD((rdd, t) => {
                  for (i <- 1 to 5) {
                    val c = rdd.filter(x=>scala.util.Random.nextInt(5) == i).count()
                    println("""T: """ + t + """: """ + c)
                  }
                })
      

      ========================================================
      Updated at 2015-05-15
      I did print some detail schedule times of the suspect lines in LocalActor::reviveOffers: 1685343501 times after 18 hours running.

      Attachments

        Activity

          People

            davies Davies Liu
            jhu Jack Hu
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: