Details
-
Improvement
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
2.4.4, 3.0.0
-
None
-
None
-
Patch
Description
Both RDD and Dataset APIs have 2 methods of collecting data from executors to driver:
- .collect() setup multiple threads in a job and dump all data from executor into drivers memory. This is great if data on driver needs to be accessible ASAP, but not as efficient if access to partitions can only happen sequentially, and outright risky if driver doesn't have enough memory to hold all data.
- the solution for issue
SPARK-25224partially alleviate this by delaying deserialisation of data in InternalRow format, such that only the much smaller serialised data needs to be entirely hold by driver memory. This solution does not abide O(1) memory consumption, thus does not scale to arbitrarily large dataset
- .toLocalIterator() fetch one partition in 1 job at a time, and fetching of the next partition does not start until sequential access to previous partition has concluded. This action abides O(1) memory consumption and is great if access to data is sequential and significantly slower than the speed where partitions can be shipped from a single executor, with 1 thread. It becomes inefficient when the sequential access to data has to wait for a relatively long time for the shipping of the next partition
The proposed solution is a crossover between two existing implementations: a concurrent subroutine that is both CPU and memory bounded. The solution allocate a fixed sized resource pool (by default = number of available CPU cores) that serves the shipping of partitions concurrently, and block sequential access to partitions' data until shipping is finished (which usually happens without blocking for partitionID >=2 due to the fact that shipping start much earlier and preemptively). Tenants of the resource pool can be GC'ed and evicted once sequential access to it's data has finished, which allows more partitions to be fetched much earlier than they are accessed. The maximum memory consumption is O(m * n), where m is the predefined concurrency and n is the size of the largest partition.
The following scala code snippet demonstrates a simple implementation:
(requires scala 2.11 + and ScalaTests)
package org.apache.spark.spike import java.util.concurrent.ArrayBlockingQueue import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.{FutureAction, SparkContext} import org.scalatest.FunSpec import scala.concurrent.Future import scala.language.implicitConversions import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} class ToLocalIteratorPreemptivelySpike extends FunSpec { import ToLocalIteratorPreemptivelySpike._ lazy val sc: SparkContext = SparkSession.builder().master("local[*]").getOrCreate().sparkContext it("can be much faster than toLocalIterator") { val max = 80 val delay = 100 val slowRDD = sc.parallelize(1 to max, 8).map { v => Thread.sleep(delay) v } val (r1, t1) = timed { slowRDD.toLocalIterator.toList } val capacity = 4 val (r2, t2) = timed { slowRDD.toLocalIteratorPreemptively(capacity).toList } assert(r1 == r2) println(s"linear: $t1, preemptive: $t2") assert(t1 > t2 * 2) assert(t2 > max * delay / capacity) } } object ToLocalIteratorPreemptivelySpike { case class PartitionExecution[T: ClassTag]( @transient self: RDD[T], id: Int ) { def eager: this.type = { AsArray.future this } case object AsArray { @transient lazy val future: FutureAction[Array[T]] = { var result: Array[T] = null val future = self.context.submitJob[T, Array[T], Array[T]]( self, _.toArray, Seq(id), { (_, data) => result = data }, result ) future } @transient lazy val now: Array[T] = future.get() } } implicit class RDDFunctions[T: ClassTag](self: RDD[T]) { import scala.concurrent.ExecutionContext.Implicits.global def _toLocalIteratorPreemptively(capacity: Int): Iterator[Array[T]] = { val executions = self.partitions.indices.map { ii => PartitionExecution(self, ii) } val buffer = new ArrayBlockingQueue[Try[PartitionExecution[T]]](capacity) Future { executions.foreach { exe => buffer.put(Success(exe)) // may be blocking due to capacity exe.eager // non-blocking } }.onFailure { case e: Throwable => buffer.put(Failure(e)) } self.partitions.indices.toIterator.map { _ => val exe = buffer.take().get exe.AsArray.now } } def toLocalIteratorPreemptively(capacity: Int): Iterator[T] = { _toLocalIteratorPreemptively(capacity).flatten } } def timed[T](fn: => T): (T, Long) = { val startTime = System.currentTimeMillis() val result = fn val endTime = System.currentTimeMillis() (result, endTime - startTime) } }
Attachments
Issue Links
- duplicates
-
SPARK-27025 Speed up toLocalIterator
- Resolved