-
Type:
Test
-
Status: Resolved
-
Priority:
Major
-
Resolution: Fixed
-
Affects Version/s: 3.0.0
-
Fix Version/s: 3.0.0
-
Component/s: Spark Core
-
Labels:None
-
Environment:
-
Target Version/s:
test("share messages with allGather() call") { val conf = new SparkConf() .setMaster("local-cluster[4, 1, 1024]") .setAppName("test-cluster") sc = new SparkContext(conf) val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() // Sleep for a random time before global sync. Thread.sleep(Random.nextInt(1000)) // Pass partitionId message in val message: String = context.partitionId().toString val messages: Array[String] = context.allGather(message) messages.toList.iterator } // Take a sorted list of all the partitionId messages val messages = rdd2.collect().head // All the task partitionIds are shared for((x, i) <- messages.view.zipWithIndex) assert(x.toString == i.toString) }
In this test, the desired `messages`(a.k.a rdd2.collect().head) should be ["0", "1", "2", "3"], but is "0" in reality.
- is caused by
-
SPARK-30667 Support simple all gather in barrier task context
-
- Resolved
-
- links to