Details
-
Improvement
-
Status: Open
-
Minor
-
Resolution: Unresolved
-
3.1.0
-
None
-
None
Description
Current logic to determine maxMessagesPerPartition results in non-uniform message size per partition based on lag of each partition.
val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
if (effectiveRateLimitPerPartition.values.sum > 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some(effectiveRateLimitPerPartition.map { case (tp, limit) => tp -> (secsPerBatch * limit).toLong }) }
This will result in wastage of resource, since few cores which have less messages to process will wait till other cores are done with their task.
Let us consider topic t have 2 partitions
Topic | Partition | Start Offset | End Offset | Current Offset |
---|---|---|---|---|
t | 0 | 0 | 10000 | 0 |
t | 1 | 0 | 100 | 0 |
and maxRatePerPartition = 1000
and batch duration = 10 sec
As per calculation
maxMessage for part-0 -> (10000/10100) * 1000 / (batchDuration = 10) = 99
maxMessage for part-1 -> (100/10100) * 1000 / (batchDuration = 10) = 1
If application is running on 2 cores, one core will wait after processing 1 record of partition 1 till 99 records gets processed on other core for partition 0, before picking up next RDD.
If we enforce uniformity in batch size across partitions in each rdd, it will avoid wastage of resource.
In above case, we can put batch size for each partition = max(batch size of all partitions) i.e. 99.
maxMessage for part-0 = 99
maxMessage for part-1 = 99
So, we can process 98 more records of partition 1 in same time without wasting any resource.