Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40932

Barrier: messages for allGather will be overridden by the following barrier APIs

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Critical
    • Resolution: Fixed
    • 3.3.0, 3.3.1
    • 3.3.2, 3.4.0
    • Spark Core
    • None

    Description

      When I was working on an internal project which has not been opened source. I found this bug that the messages for Barrier.allGather may be overridden by the following Barrier APIs, which means the user can't get the correct allGather message.

       

      This issue can easily repro by the following unit tests.

       

       

      test("SPARK-XXX, messages of allGather should not been overridden " +
        "by the following barrier APIs") {
      
        sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[2]"))
        sc.setLogLevel("INFO")
        val rdd = sc.makeRDD(1 to 10, 2)
        val rdd2 = rdd.barrier().mapPartitions { it =>
          val context = BarrierTaskContext.get()
          // Sleep for a random time before global sync.
          Thread.sleep(Random.nextInt(1000))
          // Pass partitionId message in
          val message: String = context.partitionId().toString
          val messages: Array[String] = context.allGather(message)
          context.barrier()
          Iterator.single(messages.toList)
        }
        val messages = rdd2.collect()
        // All the task partitionIds are shared across all tasks
        assert(messages.length === 2)
        messages.foreach(m => println("------- " + m))
        assert(messages.forall(_ == List("0", "1")))
      } 

       

       

      before throwing the exception by (assert(messages.forall(_ == List("0", "1"))), the print log is 

       

      ------- List(, )
      ------- List(, ) 

       

       

      You can see, the messages are empty which has been overridden by context.barrier() API.

       

      Below is the spark log,

       

      22/10/27 17:03:50.236 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO Executor: Running task 0.0 in stage 0.0 (TID 1)
      22/10/27 17:03:50.236 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO Executor: Running task 1.0 in stage 0.0 (TID 0)
      22/10/27 17:03:50.949 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 0.
      22/10/27 17:03:50.964 dispatcher-event-loop-1 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0.
      22/10/27 17:03:50.966 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 1, current progress: 1/2.
      22/10/27 17:03:51.436 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 0.
      22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 0.
      22/10/27 17:03:51.437 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received update from Task 0, current progress: 2/2.
      22/10/27 17:03:51.440 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 0 from Stage 0 (Attempt 0) received all updates from tasks, finished successfully.
      22/10/27 17:03:51.958 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 1.
      22/10/27 17:03:51.959 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 1.
      22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 1.
      22/10/27 17:03:51.960 dispatcher-event-loop-1 INFO BarrierCoordinator: Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 1, current progress: 1/2.
      22/10/27 17:03:52.437 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 1.
      22/10/27 17:03:52.438 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) has entered the global sync, current barrier epoch is 1.
      22/10/27 17:03:52.438 dispatcher-event-loop-0 INFO BarrierCoordinator: Current barrier epoch for Stage 0 (Attempt 0) is 1.
      22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 1 from Stage 0 (Attempt 0) received update from Task 0, current progress: 2/2.
      22/10/27 17:03:52.439 dispatcher-event-loop-0 INFO BarrierCoordinator: Barrier sync epoch 1 from Stage 0 (Attempt 0) received all updates from tasks, finished successfully.
      22/10/27 17:03:52.960 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO BarrierTaskContext: Task 1 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 2.
      22/10/27 17:03:52.972 Executor task launch worker for task 0.0 in stage 0.0 (TID 1) INFO Executor: Finished task 0.0 in stage 0.0 (TID 1). 1040 bytes result sent to driver
      22/10/27 17:03:52.974 dispatcher-event-loop-1 INFO TaskSchedulerImpl: Skip current round of resource offers for barrier stage 0 because the barrier taskSet requires 2 slots, while the total number of available slots is 1.
      22/10/27 17:03:52.976 task-result-getter-0 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 1) in 2762 ms on 192.168.31.236 (executor driver) (1/2)
      22/10/27 17:03:53.439 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO BarrierTaskContext: Task 0 from Stage 0(Attempt 0) finished global sync successfully, waited for 1 seconds, current barrier epoch is 2.
      22/10/27 17:03:53.445 Executor task launch worker for task 1.0 in stage 0.0 (TID 0) INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 1040 bytes result sent to driver

       

      After debugging, I found the object messages (Array[String]) returning to BarrierTaskContext are the same as the original messages

       

      I will file a PR for this issue

      Attachments

        Activity

          People

            wbo4958 Bobby Wang
            wbo4958 Bobby Wang
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: