Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31784

Fix test BarrierTaskContextSuite."share messages with allGather() call"

    XMLWordPrintableJSON

    Details

    • Type: Test
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.0
    • Component/s: Spark Core
    • Labels:
      None
    • Environment:

       

       

       

    • Target Version/s:

      Description

      test("share messages with allGather() call") {  
        val conf = new SparkConf()
            .setMaster("local-cluster[4, 1, 1024]")
            .setAppName("test-cluster")
        sc = new SparkContext(conf)
        val rdd = sc.makeRDD(1 to 10, 4)
        val rdd2 = rdd.barrier().mapPartitions { it =>  
          val context = BarrierTaskContext.get()      
          // Sleep for a random time before global sync.
          Thread.sleep(Random.nextInt(1000))  
          // Pass partitionId message in
          val message: String = context.partitionId().toString
          val messages: Array[String] = context.allGather(message)
          messages.toList.iterator 
         } 
         // Take a sorted list of all the partitionId messages
         val messages = rdd2.collect().head
         // All the task partitionIds are shared
         for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString)
       }
      

      In this test, the desired `messages`(a.k.a rdd2.collect().head) should be ["0", "1", "2", "3"], but is "0" in reality.

        Attachments

          Issue Links

            Activity

              People

              • Assignee:
                Ngone51 wuyi
                Reporter:
                Ngone51 wuyi
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: