Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-31784

Fix test BarrierTaskContextSuite."share messages with allGather() call"

Attach filesAttach ScreenshotVotersWatch issueWatchersCreate sub-taskLinkCloneUpdate Comment AuthorReplace String in CommentUpdate Comment VisibilityDelete Comments
    XMLWordPrintableJSON

    Details

    • Type: Test
    • Status: Resolved
    • Priority: Major
    • Resolution: Fixed
    • Affects Version/s: 3.0.0
    • Fix Version/s: 3.0.0
    • Component/s: Spark Core
    • Labels:
      None
    • Environment:

       

       

       

    • Target Version/s:

      Description

      test("share messages with allGather() call") {  
        val conf = new SparkConf()
            .setMaster("local-cluster[4, 1, 1024]")
            .setAppName("test-cluster")
        sc = new SparkContext(conf)
        val rdd = sc.makeRDD(1 to 10, 4)
        val rdd2 = rdd.barrier().mapPartitions { it =>  
          val context = BarrierTaskContext.get()      
          // Sleep for a random time before global sync.
          Thread.sleep(Random.nextInt(1000))  
          // Pass partitionId message in
          val message: String = context.partitionId().toString
          val messages: Array[String] = context.allGather(message)
          messages.toList.iterator 
         } 
         // Take a sorted list of all the partitionId messages
         val messages = rdd2.collect().head
         // All the task partitionIds are shared
         for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString)
       }
      

      In this test, the desired `messages`(a.k.a rdd2.collect().head) should be ["0", "1", "2", "3"], but is "0" in reality.

        Attachments

        Issue Links

          Activity

            People

            • Assignee:
              Ngone51 wuyi
              Reporter:
              Ngone51 wuyi

              Dates

              • Created:
                Updated:
                Resolved:

                Issue deployment