Thanks for the review, Junping!
I see we checked three times of canAssignMaps() In assignMapsWithLocality() in while loop. Can we simply return in the beginning of method if this is false.
canAssignMaps() is not a constant predicate function, and it's important to make it a conditional of the three while loops. As we assign more maps we can end up hitting the configured task limit.
We could add an "early out" since once canAssignMaps returns false it will not return true for the remainder of the method. However it's a very cheap function to evaluate and isn't going to save hardly any computation to do this optimization. We'd be optimizing for the uncommon case where the RM handed us more containers than we're supposed to assign. In the common case, the list of containers to assign is empty and that will preclude the need to call canAssignMaps. Since the "early out" at the beginning of the method doesn't save much I didn't make this change. If it's important this be done and I missed something, please let me know.
Any special reason to use Integer instead of int for limit? If no, I think using int could be slightly better, then we can use "if (limit <= 0)". Isn't it?
This was an artifact of an earlier design, and I see no reason to use Integer. Changed to int. However I left it as "if (limit < 0)" because there are cases where we want the limit to really be 0 (i.e.: to not request any more containers of that priority).
why do we only filter request with "Locality.ANY"?
We only need to check ANY because of the way the YARN AM-RM protocol works. ANY limits the total number of containers the application is requesting. Any new container being requested, whether that's node-local or rack-local, requires the ANY level to be incremented by one so the RM knows it needs to allocate one more container. The RM will only grant as many containers as the app requested at the ANY level regardless of how many it asked at the rack or node level.
It's important we only limit at the ANY level because the AM has no idea what locality is available on the cluster. Essentially what we're doing with this patch is giving the RM the full information of what we want wrt. locality but then also telling it to not give us all of those containers right now. That allows the RM to make the decision of which containers to grant, and it will try to do its best with locality. The problem with having the AM try to limit tasks at the rack or node locality level is that it has no idea what locality is available at the time – only the RM does. For example, if the AM wants to run two map tasks but is configured to only run one, it would have to decide which map task to request. However it doesn't know which task has availability on the cluster. It could end up asking for only task 1, not realizing that task 1 has no locality available right now (node/rack is full) but task 2 does. By telling the RM we want nodes for task 1 and task 2 but only allowing it to give us one container right now, the RM is able to decide which node to allocate based on the requested locality. Therefore the AM is more likely to get better locality with this approach than by trying to limit its task requests directly itself.
In addition, it sounds like we use Integer.MAX_VALUE as a mark for no limit. Can we check it earlier and return?
We can't early-out because if a limit is being removed (i.e.: set to MAX_VALUE) then we need to send the original, unfiltered ask for ANY. Failure to do so means the RM wouldn't see the updated ANY ask and the limit would still be in place.