Currently, Centralized opportunistic container (OContainer) allocation only considers queue length on nodes.
However, based on production experience, we found that relying only on queue length for allocating OContainers can lead to very high queue times.
Namely, while we rely on queue length to sort the candidate nodes for OContainer allocation, the queue length used in computation can be stale/outdated, depending on node HB frequency, leading to an exceedingly high number of queued nodes. This excessive queueing can lead to queued vertices on the critical path of our jobs blocking the rest of the job's vertices. Here's an example illustrating what can happen:
Suppose an NM can run 3 containers, and has no allocated containers to start with.
AM -> RM: heartbeat, allocate 6 containers
RM.NodeQueueLoadMonitor: nothing allocated on NM now, mark 3 containers on NM as allocated to AM, 3 containers from AM outstanding.
RM -> AM: allocate 3 containers on NM
NM -> RM: heartbeat with no containers allocated
RM.NodeQueueLoadMonitor: mark 0 containers allocated on NM
AM -> NM: run 3 containers
NM ->AM : ACK, run 3 containers, no more capacity on NM
AM -> RM: heartbeat
RM.NodeQueueLoadMonitor: nothing allocated on NM now, mark 3 more containers on NM as allocated to AM (in reality, there should be 3 containers already allocated on NM, and the 3 marked in this heartbeat will be queued), no containers from AM outstanding.
RM -> AM: allocate the 3 remaining containers on NM
We've found that, if OContainers are unlucky, they can be co-located with vertices that run for an excessively long period of time, requiring them to wait a long time before they can begin to run. This led to certain jobs experiencing a > 3x increase in median run time. In these cases, it is better for the OContainer to remain un-released at the RM until NMs report available resources.
To address this, we propose a new allocation policy QUEUE_LENGTH_THEN_RESOURCES, orthogonal to the original QUEUE_LENGTH policy, that considers the available node resources when allocating OContainers.
It first sorts the candidate nodes by queue length in an ascending fashion, then by available resources.
Akin to the QUEUE_LENGTH policy, the new QUEUE_LENGTH_THEN_RESOURCES policy will still pick a random node out of a configurable K nodes for allocation.
In the case where the maximum queue length on nodes is zero, it simply waits until a node reports available resources to run OContainers.
This JIRA issue will serve as an umbrella issue that encapsulates sub-tasks for this work.