Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
None
-
None
Description
Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables.
The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large.
The user-facing impact of the change include:
Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations
Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput.
In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently).
A simple way to test this:
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x }.count Numbers on 3 r3.8xlarge instances on EC2 master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
Attachments
Issue Links
- is required by
-
SPARK-2546 Configuration object thread safety issue
- Resolved
- relates to
-
SPARK-2361 Decide whether to broadcast or serialize the weights directly in MLlib algorithms
- Resolved
- links to