Details
-
Test
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.0
-
None
-
Description
test("share messages with allGather() call") { val conf = new SparkConf() .setMaster("local-cluster[4, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() // Sleep for a random time before global sync. Thread.sleep(Random.nextInt(1000)) // Pass partitionId message in val message: String = context.partitionId().toString val messages: Array[String] = context.allGather(message) messages.toList.iterator } // Take a sorted list of all the partitionId messages val messages = rdd2.collect().head // All the task partitionIds are shared for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString) }
In this test, the desired `messages`(a.k.a rdd2.collect().head) should be ["0", "1", "2", "3"], but is "0" in reality.
Attachments
Issue Links
- is caused by
-
SPARK-30667 Support simple all gather in barrier task context
- Resolved
- links to