diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java index 732db2a80ab..aa8f7a83ec6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OpportunisticContainersStatus.java @@ -149,4 +149,23 @@ public static OpportunisticContainersStatus newInstance() { @Unstable public abstract void setEstimatedQueueWaitTime(int queueWaitTime); + + /** + * Gets the max length of the opportunistic container queue on the node. + * + * @return max queue length. + */ + @Private + @Unstable + public abstract int getMaxOpportQueueLength(); + + + /** + * Sets the max length of the opportunistic container queue on the node. + * + * @param maxOppQueueLength max queue length. + */ + @Private + @Unstable + public abstract void setMaxOpportQueueLength(int maxOppQueueLength); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java index 8399713e7b5..ed61cc2955c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OpportunisticContainersStatusPBImpl.java @@ -136,4 +136,17 @@ public void setEstimatedQueueWaitTime(int queueWaitTime) { maybeInitBuilder(); builder.setEstimatedQueueWaitTime(queueWaitTime); } + + @Override + public int getMaxOpportQueueLength() { + YarnServerCommonProtos.OpportunisticContainersStatusProtoOrBuilder p = + viaProto ? proto : builder; + return p.getMaxOpportQueueLength(); + } + + @Override + public void setMaxOpportQueueLength(int maxOpportQueueLength) { + maybeInitBuilder(); + builder.setMaxOpportQueueLength(maxOpportQueueLength); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 98b172d4a35..7fa46312477 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -49,6 +49,7 @@ message OpportunisticContainersStatusProto { optional int32 queued_opport_containers = 4; optional int32 wait_queue_length = 5; optional int32 estimated_queue_wait_time = 6; + optional int32 max_opport_queue_length = 7; } message MasterKeyProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 76da37c5c34..bd236385ef6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -258,6 +258,10 @@ public int getNumQueuedContainers() { + this.queuedOpportunisticContainers.size(); } + public int getMaxOppQueueLength() { + return this.maxOppQueueLength; + } + @VisibleForTesting public int getNumQueuedGuaranteedContainers() { return this.queuedGuaranteedContainers.size(); @@ -290,6 +294,8 @@ public OpportunisticContainersStatus getOpportunisticContainersStatus() { metrics.getAllocatedOpportunisticVCores()); this.opportunisticContainersStatus.setRunningOpportContainers( metrics.getRunningOpportunisticContainers()); + this.opportunisticContainersStatus.setMaxOpportQueueLength( + getMaxOppQueueLength()); return this.opportunisticContainersStatus; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java index ed0ee1ec6f5..e80ad44a6ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java @@ -38,6 +38,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * The NodeQueueLoadMonitor keeps track of load metrics (such as queue length @@ -75,6 +76,7 @@ public int getMetric(ClusterNode c) { int queueWaitTime = -1; double timestamp; final NodeId nodeId; + int maxQueueLength = 0; public ClusterNode(NodeId nodeId) { this.nodeId = nodeId; @@ -95,6 +97,15 @@ public ClusterNode updateTimestamp() { this.timestamp = System.currentTimeMillis(); return this; } + + public ClusterNode setMaxQueueLength(int maxQueueLength) { + this.maxQueueLength = maxQueueLength; + return this; + } + + public boolean isQueueFull() { + return this.queueLength >= this.maxQueueLength; + } } private final ScheduledExecutorService scheduledExecutor; @@ -207,6 +218,8 @@ public void updateNode(RMNode rmNode) { opportunisticContainersStatus = OpportunisticContainersStatus.newInstance(); } + int maxOpportQueueLength = + opportunisticContainersStatus.getMaxOpportQueueLength(); int estimatedQueueWaitTime = opportunisticContainersStatus.getEstimatedQueueWaitTime(); int waitQueueLength = opportunisticContainersStatus.getWaitQueueLength(); @@ -222,7 +235,8 @@ public void updateNode(RMNode rmNode) { this.clusterNodes.put(rmNode.getNodeID(), new ClusterNode(rmNode.getNodeID()) .setQueueWaitTime(estimatedQueueWaitTime) - .setQueueLength(waitQueueLength)); + .setQueueLength(waitQueueLength) + .setMaxQueueLength(maxOpportQueueLength)); LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " + "with queue wait time [" + estimatedQueueWaitTime + "] and " + "wait queue length [" + waitQueueLength + "]"); @@ -291,7 +305,11 @@ public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { ReentrantReadWriteLock.ReadLock readLock = clusterNodesLock.readLock(); readLock.lock(); try { - ArrayList aList = new ArrayList<>(this.clusterNodes.values()); + // Exclude nodes whose queue is already full. + List aList = clusterNodes.values() + .stream() + .filter(clusterNode -> !clusterNode.isQueueFull()) + .collect(Collectors.toList()); List retList = new ArrayList<>(); Object[] nodes = aList.toArray(); // Collections.sort would do something similar by calling Arrays.sort diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java index dfd21ffc0e3..4073999d4ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java @@ -33,6 +33,8 @@ */ public class TestNodeQueueLoadMonitor { + private final int DEFAULT_MAX_QUEUE_LENGTH = 50; + static class FakeNodeId extends NodeId { final String host; final int port; @@ -132,6 +134,16 @@ public void testQueueLengthSort() { Assert.assertEquals("h2:2", nodeIds.get(1).toString()); Assert.assertEquals("h1:1", nodeIds.get(2).toString()); Assert.assertEquals("h4:4", nodeIds.get(3).toString()); + + // Now update h3 and fill its queue. + selector.updateNode(createRMNode("h3", 3, -1, DEFAULT_MAX_QUEUE_LENGTH)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("4-> "+ nodeIds); + Assert.assertEquals(3, nodeIds.size()); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h1:1", nodeIds.get(1).toString()); + Assert.assertEquals("h4:4", nodeIds.get(2).toString()); } @Test @@ -180,6 +192,12 @@ public void testContainerQueuingLimit() { private RMNode createRMNode(String host, int port, int waitTime, int queueLength) { + return createRMNode(host, port, waitTime, queueLength, + DEFAULT_MAX_QUEUE_LENGTH); + } + + private RMNode createRMNode(String host, int port, + int waitTime, int queueLength, int maxQueueLength) { RMNode node1 = Mockito.mock(RMNode.class); NodeId nID1 = new FakeNodeId(host, port); Mockito.when(node1.getNodeID()).thenReturn(nID1); @@ -189,6 +207,8 @@ private RMNode createRMNode(String host, int port, .thenReturn(waitTime); Mockito.when(status1.getWaitQueueLength()) .thenReturn(queueLength); + Mockito.when(status1.getMaxOpportQueueLength()) + .thenReturn(maxQueueLength); Mockito.when(node1.getOpportunisticContainersStatus()).thenReturn(status1); return node1; }