diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 8962aba..e71ddff 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -179,7 +179,7 @@ public String getNodeManagerVersion() { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { // TODO Auto-generated method stub diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index d7b159c..6b7ac3c 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -168,7 +168,7 @@ public String getNodeManagerVersion() { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { // TODO Auto-generated method stub } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8515e0a..86f45b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -167,6 +167,10 @@ private static void addDeprecatedKeys() { public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS = RM_PREFIX + "application-master-service.processors"; + public static final String RM_AUTO_UPDATE_CONTAINERS = + RM_PREFIX + "auto-update.containers"; + public static final boolean DEFAULT_RM_AUTO_UPDATE_CONTAINERS = false; + /** The actual bind address for the RM.*/ public static final String RM_BIND_HOST = RM_PREFIX + "bind-host"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 7568bbb..3b0ec10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -104,10 +104,10 @@ public abstract void setAreNodeLabelsAcceptedByRM( public abstract void setResource(Resource resource); - public abstract List getContainersToDecrease(); + public abstract List getContainersToUpdate(); - public abstract void addAllContainersToDecrease( - Collection containersToDecrease); + public abstract void addAllContainersToUpdate( + Collection containersToUpdate); public abstract ContainerQueuingLimit getContainerQueuingLimit(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 51c1a78..343af83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -75,7 +75,7 @@ private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; private ContainerQueuingLimit containerQueuingLimit = null; - private List containersToDecrease = null; + private List containersToUpdate = null; private List containersToSignal = null; public NodeHeartbeatResponsePBImpl() { @@ -119,8 +119,8 @@ private void mergeLocalToBuilder() { if (this.systemCredentials != null) { addSystemCredentialsToProto(); } - if (this.containersToDecrease != null) { - addContainersToDecreaseToProto(); + if (this.containersToUpdate != null) { + addContainersToUpdateToProto(); } if (this.containersToSignal != null) { addContainersToSignalToProto(); @@ -499,39 +499,39 @@ public void remove() { builder.addAllApplicationsToCleanup(iterable); } - private void initContainersToDecrease() { - if (this.containersToDecrease != null) { + private void initContainersToUpdate() { + if (this.containersToUpdate != null) { return; } NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getContainersToDecreaseList(); - this.containersToDecrease = new ArrayList<>(); + List list = p.getContainersToUpdateList(); + this.containersToUpdate = new ArrayList<>(); for (ContainerProto c : list) { - this.containersToDecrease.add(convertFromProtoFormat(c)); + this.containersToUpdate.add(convertFromProtoFormat(c)); } } @Override - public List getContainersToDecrease() { - initContainersToDecrease(); - return this.containersToDecrease; + public List getContainersToUpdate() { + initContainersToUpdate(); + return this.containersToUpdate; } @Override - public void addAllContainersToDecrease( - final Collection containersToDecrease) { - if (containersToDecrease == null) { + public void addAllContainersToUpdate( + final Collection containersToUpdate) { + if (containersToUpdate == null) { return; } - initContainersToDecrease(); - this.containersToDecrease.addAll(containersToDecrease); + initContainersToUpdate(); + this.containersToUpdate.addAll(containersToUpdate); } - private void addContainersToDecreaseToProto() { + private void addContainersToUpdateToProto() { maybeInitBuilder(); - builder.clearContainersToDecrease(); - if (this.containersToDecrease == null) { + builder.clearContainersToUpdate(); + if (this.containersToUpdate == null) { return; } Iterable iterable = new @@ -539,7 +539,7 @@ private void addContainersToDecreaseToProto() { @Override public Iterator iterator() { return new Iterator() { - private Iterator iter = containersToDecrease.iterator(); + private Iterator iter = containersToUpdate.iterator(); @Override public boolean hasNext() { return iter.hasNext(); @@ -555,7 +555,7 @@ public void remove() { }; } }; - builder.addAllContainersToDecrease(iterable); + builder.addAllContainersToUpdate(iterable); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index edb2d9c..4e05fba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -111,11 +111,14 @@ message NodeHeartbeatResponseProto { repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; + // to be deprecated in favour of containers_to_update repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; optional ResourceProto resource = 14; optional ContainerQueuingLimitProto container_queuing_limit = 15; repeated AppCollectorsMapProto app_collectors_map = 16; + // to be used in place of containers_to_decrease + repeated ContainerProto containers_to_update = 17; } message ContainerQueuingLimitProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index b670c36..8c0c73a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -180,14 +180,14 @@ public void testNodeHeartbeatResponsePBImplWithRMAcceptLbls() { @Test public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() { NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); - original.addAllContainersToDecrease( + original.addAllContainersToUpdate( Arrays.asList(getDecreasedContainer(1, 2, 2048, 2), getDecreasedContainer(2, 3, 1024, 1))); NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(original.getProto()); - assertEquals(1, copy.getContainersToDecrease().get(0) + assertEquals(1, copy.getContainersToUpdate().get(0) .getId().getContainerId()); - assertEquals(1024, copy.getContainersToDecrease().get(1) + assertEquals(1024, copy.getContainersToUpdate().get(1) .getResource().getMemorySize()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index b5ec383..1d9256f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -1099,7 +1099,7 @@ public void run() { parseCredentials(systemCredentials)); } List - containersToDecrease = response.getContainersToDecrease(); + containersToDecrease = response.getContainersToUpdate(); if (!containersToDecrease.isEmpty()) { dispatcher.getEventHandler().handle( new CMgrDecreaseContainersResourceEvent( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index a1e8ca0..12931bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -166,6 +166,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -1241,10 +1242,19 @@ private void updateContainerInternal(ContainerId containerId, org.apache.hadoop.yarn.server.nodemanager. containermanager.container.ContainerState currentState = container.getContainerState(); - if (currentState != org.apache.hadoop.yarn.server. - nodemanager.containermanager.container.ContainerState.RUNNING && - currentState != org.apache.hadoop.yarn.server. - nodemanager.containermanager.container.ContainerState.SCHEDULED) { + EnumSet allowedStates = EnumSet.of( + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.RUNNING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.SCHEDULED, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.LOCALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.REINITIALIZING, + org.apache.hadoop.yarn.server.nodemanager.containermanager.container + .ContainerState.RELAUNCHING); + if (!allowedStates.contains(currentState)) { throw RPCUtil.getRemoteException("Container " + containerId.toString() + " is in " + currentState.name() + " state." + " Resource can only be changed when a container is in" @@ -1279,17 +1289,12 @@ private void updateContainerInternal(ContainerId containerId, org.apache.hadoop.yarn.api.records.Container.newInstance( containerId, null, null, targetResource, null, null, currentExecType); - } else { - increasedContainer = - org.apache.hadoop.yarn.api.records.Container.newInstance( - containerId, null, null, currentResource, null, null, - targetExecType); - } - if (context.getIncreasedContainers().putIfAbsent(containerId, - increasedContainer) != null){ - throw RPCUtil.getRemoteException("Container " + containerId.toString() - + " resource is being increased -or- " + - "is undergoing ExecutionType promoted."); + if (context.getIncreasedContainers().putIfAbsent(containerId, + increasedContainer) != null){ + throw RPCUtil.getRemoteException("Container " + containerId.toString() + + " resource is being increased -or- " + + "is undergoing ExecutionType promoted."); + } } } this.readLock.lock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java index 19b4505..644bdae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java @@ -173,6 +173,7 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) { new ChangeMonitoringContainerResourceEvent(containerId, updateEvent.getUpdatedToken().getResource())); } else { + // Is Queued or localizing.. updateEvent.getContainer().setContainerTokenIdentifier( updateEvent.getUpdatedToken()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index aa7f524..e6f2bb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -551,7 +551,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); - rmNode.updateNodeHeartbeatResponseForContainersDecreasing( + rmNode.updateNodeHeartbeatResponseForUpdatedContainers( nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 1e9463a..f49db7e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode - .RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; @@ -284,7 +283,6 @@ public void setContainer(Container container) { @Override public RMContainerState getState() { this.readLock.lock(); - try { return this.stateMachine.getCurrentState(); } finally { @@ -598,7 +596,7 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { RMContainerUpdatesAcquiredEvent acquiredEvent = (RMContainerUpdatesAcquiredEvent) event; if (acquiredEvent.isIncreasedContainer()) { - // If container is increased but not acquired by AM, we will start + // If container is increased but not started by AM, we will start // containerAllocationExpirer for this container in this transition. container.containerAllocationExpirer.register( new AllocationExpirationInfo(event.getContainerId(), true)); @@ -641,7 +639,7 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { container.lastConfirmedResource = rmContainerResource; container.containerAllocationExpirer.unregister( new AllocationExpirationInfo(event.getContainerId())); - container.eventHandler.handle(new RMNodeDecreaseContainerEvent( + container.eventHandler.handle(new RMNodeUpdateContainerEvent( container.nodeId, Collections.singletonList(container.getContainer()))); } else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 86f8679..bb1e6ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -169,9 +169,9 @@ public Set getNodeLabels(); /** - * Update containers to be decreased + * Update containers to be updated */ - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response); public List pullNewlyIncreasedContainers(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java deleted file mode 100644 index 62925ad..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java +++ /dev/null @@ -1,39 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.hadoop.yarn.server.resourcemanager.rmnode; - -import java.util.List; - -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.NodeId; - -public class RMNodeDecreaseContainerEvent extends RMNodeEvent { - final List toBeDecreasedContainers; - - public RMNodeDecreaseContainerEvent(NodeId nodeId, - List toBeDecreasedContainers) { - super(nodeId, RMNodeEventType.DECREASE_CONTAINER); - - this.toBeDecreasedContainers = toBeDecreasedContainers; - } - - public List getToBeDecreasedContainers() { - return toBeDecreasedContainers; - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java index b28fef3..a3b2ed7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java @@ -42,7 +42,7 @@ // Source: Container CONTAINER_ALLOCATED, CLEANUP_CONTAINER, - DECREASE_CONTAINER, + UPDATE_CONTAINER, // Source: ClientRMService SIGNAL_CONTAINER, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1f121f8..04d6efb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -171,7 +171,7 @@ private final List runningApplications = new ArrayList(); - private final Map toBeDecreasedContainers = + private final Map toBeUpdatedContainers = new HashMap<>(); private final Map nmReportedIncreasedContainers = @@ -228,8 +228,8 @@ .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, - RMNodeEventType.DECREASE_CONTAINER, - new DecreaseContainersTransition()) + RMNodeEventType.UPDATE_CONTAINER, + new UpdateContainersTransition()) .addTransition(NodeState.RUNNING, NodeState.RUNNING, RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition()) .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN, @@ -614,18 +614,18 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response }; @VisibleForTesting - public Collection getToBeDecreasedContainers() { - return toBeDecreasedContainers.values(); + public Collection getToBeUpdatedContainers() { + return toBeUpdatedContainers.values(); } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { this.writeLock.lock(); try { - response.addAllContainersToDecrease(toBeDecreasedContainers.values()); - toBeDecreasedContainers.clear(); + response.addAllContainersToUpdate(toBeUpdatedContainers.values()); + toBeUpdatedContainers.clear(); } finally { this.writeLock.unlock(); } @@ -1032,15 +1032,15 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } } - public static class DecreaseContainersTransition + public static class UpdateContainersTransition implements SingleArcTransition { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - RMNodeDecreaseContainerEvent de = (RMNodeDecreaseContainerEvent) event; + RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event; - for (Container c : de.getToBeDecreasedContainers()) { - rmNode.toBeDecreasedContainers.put(c.getId(), c); + for (Container c : de.getToBeUpdatedContainers()) { + rmNode.toBeUpdatedContainers.put(c.getId(), c); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java new file mode 100644 index 0000000..6c441e8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java @@ -0,0 +1,39 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class RMNodeUpdateContainerEvent extends RMNodeEvent { + final List toBeUpdatedContainers; + + public RMNodeUpdateContainerEvent(NodeId nodeId, + List toBeUpdatedContainers) { + super(nodeId, RMNodeEventType.UPDATE_CONTAINER); + + this.toBeUpdatedContainers = toBeUpdatedContainers; + } + + public List getToBeUpdatedContainers() { + return toBeUpdatedContainers; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 79caab0..c3879dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -150,6 +150,10 @@ */ protected final ReentrantReadWriteLock.WriteLock writeLock; + // If set to true, then ALL container updates will be automatically sent to + // the NM in the next heartbeat. + private boolean autoUpdateContainers = false; + /** * Construct the service. * @@ -178,6 +182,9 @@ public void serviceInit(Configuration conf) throws Exception { configuredMaximumAllocationWaitTime); maxClusterLevelAppPriority = getMaxPriorityFromConf(conf); createReleaseCache(); + autoUpdateContainers = + conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, + YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS); super.serviceInit(conf); } @@ -235,6 +242,10 @@ public boolean accept(SchedulerNode node) { return nodeTracker.getNodes(nodeFilter); } + public boolean shouldContainersBeAutoUpdated() { + return this.autoUpdateContainers; + } + @Override public Resource getClusterResource() { return nodeTracker.getClusterCapacity(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 397d507..cc14a1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -74,8 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode - .RMNodeDecreaseContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; @@ -663,20 +662,38 @@ private Container updateContainerAndNMToken(RMContainer rmContainer, + " an updated container " + container.getId(), e); return null; } - - if (updateType == null || - ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateType || - ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) { + + if (updateType == null) { + // This is a newly allocated container rmContainer.handle(new RMContainerEvent( rmContainer.getContainerId(), RMContainerEventType.ACQUIRED)); } else { - rmContainer.handle(new RMContainerUpdatesAcquiredEvent( - rmContainer.getContainerId(), - ContainerUpdateType.INCREASE_RESOURCE == updateType)); - if (ContainerUpdateType.DECREASE_RESOURCE == updateType) { + // Resource increase is handled as follows: + // If the AM does not use the updated token to increase the container + // for a configured period of time, the RM will automatically rollback + // the update by performing a container decrease. This rollback (which + // essentially is another resource decrease update) is notified to the + // NM heartbeat response. If autoUpdate flag is set, then AM does not + // need to do anything - same code path as resource decrease. + // + // Resource Decrease is always automatic: the AM never has to do + // anything. It is always via NM heartbeat response. + // + // ExecutionType updates (both Promotion and Demotion) are either + // always automatic (if the flag is set) or the AM has to explicitly + // call updateContainer() on the NM. There is no expiry + boolean autoUpdate = + ContainerUpdateType.DECREASE_RESOURCE == updateType || + ((AbstractYarnScheduler)rmContext.getScheduler()) + .shouldContainersBeAutoUpdated(); + if (autoUpdate) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(), + new RMNodeUpdateContainerEvent(rmContainer.getNodeId(), Collections.singletonList(rmContainer.getContainer()))); + } else { + rmContainer.handle(new RMContainerUpdatesAcquiredEvent( + rmContainer.getContainerId(), + ContainerUpdateType.INCREASE_RESOURCE == updateType)); } } return container; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 91170d1..7f58711 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -242,7 +242,7 @@ public long getLastHealthReportTime() { } @Override - public void updateNodeHeartbeatResponseForContainersDecreasing( + public void updateNodeHeartbeatResponseForUpdatedContainers( NodeHeartbeatResponse response) { } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index b4b05ed..291a74e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler - .SchedulerApplicationAttempt; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica @@ -60,11 +59,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; public class TestContainerResizing { @@ -205,7 +202,7 @@ public RMNodeLabelsManager createNodeLabelManager() { RMNodeImpl rmNode = (RMNodeImpl) rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); Collection decreasedContainers = - rmNode.getToBeDecreasedContainers(); + rmNode.getToBeUpdatedContainers(); boolean rmNodeReceivedDecreaseContainer = false; for (Container c : decreasedContainers) { if (c.getId().equals(containerId1) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java index 184e854..a76ed64 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java @@ -319,7 +319,7 @@ public void testConsecutiveContainerIncreaseAllocationExpiration() verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB); // Verify NM receives the decrease message (3G) List containersToDecrease = - nm1.nodeHeartbeat(true).getContainersToDecrease(); + nm1.nodeHeartbeat(true).getContainersToUpdate(); Assert.assertEquals(1, containersToDecrease.size()); Assert.assertEquals( 3 * GB, containersToDecrease.get(0).getResource().getMemorySize()); @@ -435,7 +435,7 @@ public void testDecreaseAfterIncreaseWithAllocationExpiration() .getAllocatedResource().getMemorySize()); // Verify NM receives 2 decrease message List containersToDecrease = - nm1.nodeHeartbeat(true).getContainersToDecrease(); + nm1.nodeHeartbeat(true).getContainersToUpdate(); Assert.assertEquals(2, containersToDecrease.size()); // Sort the list to make sure containerId3 is the first Collections.sort(containersToDecrease);