diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 7667157..d59b783 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -49,7 +48,7 @@ public static NodeId newNodeID(String host, int port) { private String nodeAddr; private String httpAddress; private int cmdPort; - private volatile ResourceOption perNode; + private volatile Resource perNode; private String rackName; private String healthReport; private NodeState state; @@ -57,7 +56,7 @@ public static NodeId newNodeID(String host, int port) { private List toCleanUpApplications; public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, - ResourceOption perNode, String rackName, String healthReport, + Resource perNode, String rackName, String healthReport, int cmdPort, String hostName, NodeState state) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; @@ -105,10 +104,6 @@ public long getLastHealthReportTime() { } public Resource getTotalCapability() { - return perNode.getResource(); - } - - public ResourceOption getResourceOption() { return perNode; } @@ -153,32 +148,27 @@ public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { return list; } - @Override - public String getNodeManagerVersion() { - // TODO Auto-generated method stub - return null; - } - @Override - public void setResourceOption(ResourceOption resourceOption) { - perNode = resourceOption; + public String getNodeManagerVersion() { + // TODO Auto-generated method stub + return null; } + } public static RMNode newNodeInfo(String rackName, String hostName, - final ResourceOption resourceOption, int port) { + final Resource resource, int port) { final NodeId nodeId = newNodeID(hostName, port); final String nodeAddr = hostName + ":" + port; final String httpAddress = hostName; return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress, - resourceOption, rackName, "Me good", + resource, rackName, "Me good", port, hostName, null); } public static RMNode newNodeInfo(String rackName, String hostName, final Resource resource) { - return newNodeInfo(rackName, hostName, ResourceOption.newInstance(resource, - RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), NODE_ID++); + return newNodeInfo(rackName, hostName, resource, NODE_ID++); } } diff --git hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index bbe24c8..755bf5f 100644 --- hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -144,14 +143,4 @@ public String getNodeManagerVersion() { return node.getNodeManagerVersion(); } - @Override - public void setResourceOption(ResourceOption resourceOption) { - node.setResourceOption(resourceOption); - } - - @Override - public ResourceOption getResourceOption() { - return node.getResourceOption(); - } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index 33230d8..f2af2e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -71,6 +71,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import com.google.protobuf.BlockingService; @@ -395,6 +397,7 @@ void refreshServiceAcls(Configuration configuration, return UserGroupInformation.createRemoteUser(user).getGroupNames(); } + @SuppressWarnings("unchecked") @Override public UpdateNodeResourceResponse updateNodeResource( UpdateNodeResourceRequest request) throws YarnException, IOException { @@ -425,7 +428,12 @@ public UpdateNodeResourceResponse updateNodeResource( if (node == null) { LOG.warn("Resource update get failed on an unrecognized node: " + nodeId); } else { - node.setResourceOption(newResourceOption); + // update resource to RMNode + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeResourceUpdateEvent(nodeId, newResourceOption.getResource())); + // update resourceOption to SchedulerNode + this.rmContext.getDispatcher().getEventHandler().handle( + new NodeResourceUpdateSchedulerEvent(node, newResourceOption)); LOG.info("Update resource successfully on node(" + node.getNodeID() +") with resource(" + newResourceOption.toString() + ")"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f80ce85..0e05be8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -30,12 +30,10 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -269,8 +267,7 @@ public RegisterNodeManagerResponse registerNodeManager( .getCurrentKey()); RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), - nodeManagerVersion); + resolve(host), capability, nodeManagerVersion); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 24793e8..a423ea5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -102,18 +102,6 @@ public Resource getTotalCapability(); /** - * Set resource option with total available resource and overCommitTimoutMillis - * @param resourceOption - */ - public void setResourceOption(ResourceOption resourceOption); - - /** - * resource option with total available resource and overCommitTimoutMillis - * @return ResourceOption - */ - public ResourceOption getResourceOption(); - - /** * The rack name for this node manager. * @return the rack name. */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index ef644be..aef6801 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -24,6 +24,7 @@ // Source: AdminService DECOMMISSION, + RESOURCE_UPDATE, // ResourceTrackerService STATUS_UPDATE, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 52bc285..4146331 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -93,7 +92,7 @@ private final int httpPort; private final String nodeAddress; // The containerManager address private final String httpAddress; - private volatile ResourceOption resourceOption; + private volatile Resource totalCapability; private final Node node; private String healthReport; @@ -146,6 +145,8 @@ RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceTransition()) //Transitions from UNHEALTHY state .addTransition(NodeState.UNHEALTHY, @@ -174,13 +175,13 @@ RMNodeEvent> stateMachine; public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, - int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) { + int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; this.commandPort = cmPort; this.httpPort = httpPort; - this.resourceOption = resourceOption; + this.totalCapability = capability; this.nodeAddress = hostName + ":" + cmPort; this.httpAddress = hostName + ":" + httpPort; this.node = node; @@ -236,17 +237,7 @@ public String getHttpAddress() { @Override public Resource getTotalCapability() { - return this.resourceOption.getResource(); - } - - @Override - public void setResourceOption(ResourceOption resourceOption) { - this.resourceOption = resourceOption; - } - - @Override - public ResourceOption getResourceOption(){ - return this.resourceOption; + return this.totalCapability; } @Override @@ -505,7 +496,18 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { NodesListManagerEventType.NODE_USABLE, rmNode)); } } + + public static class UpdateNodeResourceTransition + implements SingleArcTransition { + @Override + public void transition(RMNodeImpl rmNode, RMNodeEvent event) { + Resource resource = ((RMNodeResourceUpdateEvent)event).getResource(); + // Set resource on node + rmNode.totalCapability = resource; + } + } + public static class CleanUpAppTransition implements SingleArcTransition { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeResourceUpdateEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeResourceUpdateEvent.java new file mode 100644 index 0000000..95eeac4 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeResourceUpdateEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; + +public class RMNodeResourceUpdateEvent extends RMNodeEvent { + + private final Resource resource; + + public RMNodeResourceUpdateEvent(NodeId nodeId, Resource resource) { + super(nodeId, RMNodeEventType.RESOURCE_UPDATE); + this.resource = resource; + } + + public Resource getResource() { + return resource; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 524b1ab..a637333 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -81,6 +81,12 @@ public abstract Resource getTotalResource(); /** + * Set total resources on the node. + * @param total resources on the node. + */ + public abstract void setTotalResource(Resource resource); + + /** * Get the ID of the node which contains both its hostname and port. * @return the ID of the node */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java index 572a9f9..eeaf783 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -150,32 +149,25 @@ public static void normalizeRequest( /** * Update resource in SchedulerNode if any resource change in RMNode. * @param node SchedulerNode with old resource view - * @param rmNode RMNode with new resource view - * @param clusterResource the cluster's resource that need to update + * @param resource RMNode with new resource view + * @param clusterResource the cluster's resource that need to be updated * @param log Scheduler's log for resource change */ - public static void updateResourceIfChanged(SchedulerNode node, - RMNode rmNode, Resource clusterResource, Log log) { - Resource oldAvailableResource = node.getAvailableResource(); - Resource newAvailableResource = Resources.subtract( - rmNode.getTotalCapability(), node.getUsedResource()); + public static void updateResourceOnSchedulerNode(SchedulerNode node, + Resource newResource, Resource clusterResource, Log log) { + Resource oldResource = node.getTotalResource(); - if (!newAvailableResource.equals(oldAvailableResource)) { - Resource deltaResource = Resources.subtract(newAvailableResource, - oldAvailableResource); - // Reflect resource change to scheduler node. - node.applyDeltaOnAvailableResource(deltaResource); - // Reflect resource change to clusterResource. - Resources.addTo(clusterResource, deltaResource); - // TODO process resource over-commitment case (allocated containers - // > total capacity) in different option by getting value of - // overCommitTimeoutMillis. - - // Log resource change - log.info("Resource change on node: " + rmNode.getNodeAddress() - + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: " - + deltaResource.getMemory() +"MB"); - } + // Log resource change + log.info("Update resource on node: " + node.getNodeName() + + " from: " + oldResource + ", to: " + + newResource); + Resource deltaResource = Resources.subtract(newResource, oldResource); + + // update resource to node + node.setTotalResource(newResource); + + // update resource to clusterResource + Resources.addTo(clusterResource, deltaResource); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6025639..d457e3d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -66,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -631,9 +632,6 @@ private synchronized void nodeUpdate(RMNode nm) { FiCaSchedulerNode node = getNode(nm.getNodeID()); - // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG); - List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -702,6 +700,17 @@ private synchronized void nodeUpdate(RMNode nm) { } } + + /** + * Process resource update on a node. + */ + private synchronized void nodeResourceUpdate(RMNode nm, + ResourceOption resourceOption) { + FiCaSchedulerNode node = getNode(nm.getNodeID()); + Resource newResource = resourceOption.getResource(); + SchedulerUtils.updateResourceOnSchedulerNode(node, newResource, + clusterResource, LOG); + } private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { // Get the application for the finished container @@ -734,6 +743,14 @@ public void handle(SchedulerEvent event) { removeNode(nodeRemovedEvent.getRemovedRMNode()); } break; + case NODE_RESOURCE_UPDATE: + { + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent)event; + nodeResourceUpdate(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 23068fe..bec4228 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -274,4 +274,11 @@ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { Resources.addTo(this.availableResource, deltaResource); } + @Override + public synchronized void setTotalResource(Resource resource) { + this.totalResourceCapability = resource; + this.availableResource = Resources.subtract(totalResourceCapability, + this.usedResource); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeResourceUpdateSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeResourceUpdateSchedulerEvent.java new file mode 100644 index 0000000..df32b28 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeResourceUpdateSchedulerEvent.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +public class NodeResourceUpdateSchedulerEvent extends SchedulerEvent { + + private final RMNode rmNode; + private final ResourceOption resourceOption; + + public NodeResourceUpdateSchedulerEvent(RMNode rmNode, + ResourceOption resourceOption) { + super(SchedulerEventType.NODE_RESOURCE_UPDATE); + this.rmNode = rmNode; + this.resourceOption = resourceOption; + } + + public RMNode getRMNode() { + return rmNode; + } + + public ResourceOption getResourceOption() { + return resourceOption; + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index dd1aec7..b40996c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -24,6 +24,7 @@ NODE_ADDED, NODE_REMOVED, NODE_UPDATE, + NODE_RESOURCE_UPDATE, // Source: RMAppAttempt APP_ATTEMPT_ADDED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 97ea6d4..8c49316 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -276,4 +276,11 @@ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { Resources.addTo(this.availableResource, deltaResource); } + @Override + public synchronized void setTotalResource(Resource resource) { + this.totalResourceCapability = resource; + this.availableResource = Resources.subtract(totalResourceCapability, + this.usedResource); + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bc716c1..e4c3c39 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -72,7 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; @@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -894,6 +895,17 @@ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode no } /** + * Process resource update on a node. + */ + private synchronized void nodeResourceUpdate(RMNode nm, + ResourceOption resourceOption) { + FSSchedulerNode node = nodes.get(nm.getNodeID()); + Resource newResource = resourceOption.getResource(); + SchedulerUtils.updateResourceOnSchedulerNode(node, newResource, + this.clusterCapacity, LOG); + } + + /** * Process a heartbeat update from a node. */ private synchronized void nodeUpdate(RMNode nm) { @@ -902,9 +914,6 @@ private synchronized void nodeUpdate(RMNode nm) { } eventLog.log("HEARTBEAT", nm.getHostName()); FSSchedulerNode node = nodes.get(nm.getNodeID()); - - // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG); List containerInfoList = nm.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); @@ -1090,6 +1099,15 @@ public void handle(SchedulerEvent event) { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode()); break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent)event; + nodeResourceUpdate(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; case APP_ATTEMPT_ADDED: if (!(event instanceof AppAttemptAddedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index c5f0bd6..50c82cb 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -83,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -151,6 +153,7 @@ public QueueInfo getQueueInfo( queueInfo.setCurrentCapacity((float) usedResource.getMemory() / clusterResource.getMemory()); } + LOG.warn("XXX: usedResource:" +usedResource.getMemory() + ", clusterResource:" + clusterResource.getMemory()); queueInfo.setMaximumCapacity(1.0f); queueInfo.setChildQueues(new ArrayList()); queueInfo.setQueueState(QueueState.RUNNING); @@ -633,13 +636,21 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application return assignedContainers; } + + /** + * Process resource update on a node. + */ + private synchronized void nodeResourceUpdate(RMNode nm, + ResourceOption resourceOption) { + FiCaSchedulerNode node = getNode(nm.getNodeID()); + Resource newResource = resourceOption.getResource(); + SchedulerUtils.updateResourceOnSchedulerNode(node, newResource, + clusterResource, LOG); + } private synchronized void nodeUpdate(RMNode rmNode) { FiCaSchedulerNode node = getNode(rmNode.getNodeID()); - // Update resource if any change - SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG); - List containerInfoList = rmNode.pullContainerUpdates(); List newlyLaunchedContainers = new ArrayList(); List completedContainers = new ArrayList(); @@ -690,6 +701,14 @@ public void handle(SchedulerEvent event) { removeNode(nodeRemovedEvent.getRemovedRMNode()); } break; + case NODE_RESOURCE_UPDATE: + { + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent)event; + nodeResourceUpdate(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 8ef01d9..79f9098 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -94,14 +93,14 @@ public static Resource newAvailResource(Resource total, Resource used) { private String nodeAddr; private String httpAddress; private int cmdPort; - private ResourceOption perNode; + private Resource perNode; private String rackName; private String healthReport; private long lastHealthReportTime; private NodeState state; public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress, - ResourceOption perNode, String rackName, String healthReport, + Resource perNode, String rackName, String healthReport, long lastHealthReportTime, int cmdPort, String hostName, NodeState state) { this.nodeId = nodeId; this.nodeAddr = nodeAddr; @@ -147,7 +146,7 @@ public String getHttpAddress() { @Override public Resource getTotalCapability() { - return this.perNode.getResource(); + return this.perNode; } @Override @@ -203,16 +202,6 @@ public String getHealthReport() { public long getLastHealthReportTime() { return lastHealthReportTime; } - - @Override - public void setResourceOption(ResourceOption resourceOption) { - this.perNode = resourceOption; - } - - @Override - public ResourceOption getResourceOption(){ - return this.perNode; - } }; @@ -232,9 +221,8 @@ private static RMNode buildRMNode(int rack, final Resource perNode, final String httpAddress = httpAddr; String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe"; - return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, - ResourceOption.newInstance(perNode, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), - rackName, healthReport, 0, nid, hostName, state); + return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode, + rackName, healthReport, 0, nid, hostName, state); } public static RMNode nodeInfo(int rack, final Resource perNode, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 82046c7..e1aa9c1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -36,18 +37,17 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -453,8 +453,7 @@ private RMNodeImpl getRunningNode() { NodeId nodeId = BuilderUtils.newNodeId("localhost", 0); Resource capability = Resource.newInstance(4096, 4); RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, - null, ResourceOption.newInstance(capability, - RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null); + null, capability, null); node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; @@ -524,5 +523,22 @@ public void testReconnect() { Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, nodesListManagerEvent.getType()); } + + @Test + public void testResourceUpdate() { + RMNodeImpl node = getRunningNode(); + Resource oldCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096); + assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4); + node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(), Resource.newInstance(2048, 2))); + Resource newCapacity = node.getTotalCapability(); + assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048); + assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2); + + Assert.assertEquals(NodeState.RUNNING, node.getState()); + Assert.assertNotNull(nodesListManagerEvent); + Assert.assertEquals(NodesListManagerEventType.NODE_USABLE, + nodesListManagerEvent.getType()); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 7ce7e42..33126f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -254,17 +255,16 @@ public void testUpdateResourceOnNode() throws Exception { (Map) method.invoke(scheduler); assertEquals(schedulerNodes.values().size(), 1); - // set resource of RMNode to 1024 and verify it works. - node0.setResourceOption(ResourceOption.newInstance( - Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)); - assertEquals(node0.getTotalCapability().getMemory(), 1024); - // verify that SchedulerNode's resource hasn't been changed. - assertEquals(schedulerNodes.get(node0.getNodeID()). - getAvailableResource().getMemory(), 2048); - // now, NM heartbeat comes. - NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); - scheduler.handle(node0Update); - // SchedulerNode's available resource is changed. + Resource newResource = Resources.createResource(1024, 4); + + NodeResourceUpdateSchedulerEvent node0ResourceUpdate = new + NodeResourceUpdateSchedulerEvent(node0, ResourceOption.newInstance( + newResource, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)); + scheduler.handle(node0ResourceUpdate); + + // SchedulerNode's total resource and available resource are changed. + assertEquals(schedulerNodes.get(node0.getNodeID()).getTotalResource() + .getMemory(), 1024); assertEquals(schedulerNodes.get(node0.getNodeID()). getAvailableResource().getMemory(), 1024); QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false); @@ -296,6 +296,7 @@ public void testUpdateResourceOnNode() throws Exception { // Before the node update event, there are one local request Assert.assertEquals(1, nodeLocal.getNumContainers()); + NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); // Now schedule. scheduler.handle(node0Update); @@ -523,7 +524,6 @@ public void testConcurrentAccessOnApplications() throws Exception { fs.applications, FiCaSchedulerApp.class, Queue.class); } - @SuppressWarnings("resource") @Test public void testBlackListNodes() throws Exception { Configuration conf = new Configuration();