Details
-
Improvement
-
Status: Closed
-
Major
-
Resolution: Won't Do
-
3.4.0
-
None
-
None
Description
In the case of uneven incoming rates and high scheduling delays, streaming will continue to add microbatches to the eventloop and submit the job to the job thread executor. Consequently, pending microbatches hold fewer offset ranges in Spark streaming Kafka if the kafka lag is less than the configured maximum per partition.
We rely on the third-party service to add additional metadata to incoming records, and its response times remain constant regardless of microbatch size. So, Small microbatches can increase latencies further. An RDD's metadata is fetched during the transform phase in our case for various reasons, which is executed when micorbatch is scheduled. Our RDD transform on high level :
val dstreams = ...
dstreams.transform(rdd =>
{
val uniqueItems = rdd.map(..).distinct.collect
val metadata = getMedatada(uniqueItems)
val rddWithMedatadata = rdd.map(...) // adds metadata
rddWithMedatadata
})
Scheduling many small microbatches can be avoided by skipping new jobs when there are sufficient pending jobs in the queue.
Proposed changes in JobExecutor.scala on high level:
val maxPendingJobs = ssc.sc.conf.getInt("spark.streaming.maxPendingBatches", -1) private def processEvent(event: JobGeneratorEvent): Unit = { logDebug("Got event " + event) event match { case GenerateJobs(time) => if (maxPendingJobs == -1 || jobScheduler.getPendingTimes().size < maxPendingJobs){ generateJobs(time) }else { logWarning("Skipping JobGenerator at " + time) // TODO: add pending times in queue to log. } // other current cases case ... ..... } }