Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31784

Fix test BarrierTaskContextSuite."share messages with allGather() call"

    XMLWordPrintableJSON

Details

    • Test
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.0.0
    • 3.0.0
    • Spark Core
    • None
    •  

       

       

    Description

      test("share messages with allGather() call") {  
        val conf = new SparkConf()
            .setMaster("local-cluster[4, 1, 1024]")
            .setAppName("test-cluster")
        sc = new SparkContext(conf)
        val rdd = sc.makeRDD(1 to 10, 4)
        val rdd2 = rdd.barrier().mapPartitions { it =>  
          val context = BarrierTaskContext.get()      
          // Sleep for a random time before global sync.
          Thread.sleep(Random.nextInt(1000))  
          // Pass partitionId message in
          val message: String = context.partitionId().toString
          val messages: Array[String] = context.allGather(message)
          messages.toList.iterator 
         } 
         // Take a sorted list of all the partitionId messages
         val messages = rdd2.collect().head
         // All the task partitionIds are shared
         for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString)
       }
      

      In this test, the desired `messages`(a.k.a rdd2.collect().head) should be ["0", "1", "2", "3"], but is "0" in reality.

      Attachments

        Issue Links

          Activity

            People

              Ngone51 wuyi
              Ngone51 wuyi
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: