diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 1d40320..b5d26ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -32,7 +32,6 @@ import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -240,13 +240,6 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); - if (!request.getContainerStatuses().isEmpty()) { - LOG.info("received container statuses on node manager register :" - + request.getContainerStatuses()); - for (ContainerStatus containerStatus : request.getContainerStatuses()) { - handleContainerStatus(containerStatus); - } - } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -305,7 +298,7 @@ public RegisterNodeManagerResponse registerNodeManager( RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.STARTED)); + new RMNodeStartEvent(nodeId, request.getContainerStatuses())); } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index f4f2e20..523be7f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -199,12 +200,6 @@ new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, RMAppEventType.KILL, new KillAttemptTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -763,6 +758,11 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return RMAppState.SUBMITTED; } + // Add the current attempt to the scheduler.It'll be removed from + // scheduler in RMAppAttempt#BaseFinalTransition + app.handler.handle(new AppAttemptAddedSchedulerEvent(app.currentAttempt + .getAppAttemptId(), false)); + // YARN-1507 is saving the application state after the application is // accepted. So after YARN-1507, an app is saved meaning it is accepted. // Thus we return ACCECPTED state on recovery. @@ -1055,8 +1055,12 @@ public void transition(RMAppImpl app, RMAppEvent event) { if (app.finishTime == 0 ) { app.finishTime = System.currentTimeMillis(); } - app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + // Recovered apps that were completed were not added to scheduler, so no + // need to remove them from scheduler. + if (app.recoveredFinalState == null) { + app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, + finalState)); + } app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b4bad12..4a479f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -203,5 +203,4 @@ * @return {@link ApplicationAttemptReport} */ ApplicationAttemptReport createApplicationAttemptReport(); - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index e289ad5..83abf62 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -267,15 +267,17 @@ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.CONTAINER_FINISHED, new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED)) // Transitions from LAUNCHED State .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING, RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition()) - .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, + .addTransition(RMAppAttemptState.LAUNCHED, + EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new ContainerFinishedTransition( + new AMContainerCrashedBeforeRunningTransition(), + RMAppAttemptState.LAUNCHED)) .addTransition( RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -302,7 +304,9 @@ RMAppAttemptState.RUNNING, EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new ContainerFinishedTransition()) + new ContainerFinishedTransition( + new AMContainerCrashedAtRunningTransition(), + RMAppAttemptState.RUNNING)) .addTransition( RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -1207,17 +1211,16 @@ public void transition(RMAppAttemptImpl appAttempt, } } - private static final class AMContainerCrashedTransition extends + private static final class AMContainerCrashedBeforeRunningTransition extends BaseFinalTransition { - public AMContainerCrashedTransition() { + public AMContainerCrashedBeforeRunningTransition() { super(RMAppAttemptState.FAILED); } @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptContainerFinishedEvent finishEvent = ((RMAppAttemptContainerFinishedEvent)event); @@ -1410,6 +1413,16 @@ public void transition(RMAppAttemptImpl appAttempt, implements MultipleArcTransition { + // The transition To Do after attempt final state is saved. + private BaseTransition transitionToDo; + private RMAppAttemptState currentState; + + public ContainerFinishedTransition(BaseTransition transitionToDo, + RMAppAttemptState currentState) { + this.transitionToDo = transitionToDo; + this.currentState = currentState; + } + @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { @@ -1426,14 +1439,13 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, containerStatus.getContainerId())) { // Remember the follow up transition and save the final attempt state. appAttempt.rememberTargetTransitionsAndStoreState(event, - new ContainerFinishedFinalStateSavedTransition(), - RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); + transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); return RMAppAttemptState.FINAL_SAVING; } // Normal container.Put it in completedcontainers list appAttempt.justFinishedContainers.add(containerStatus); - return RMAppAttemptState.RUNNING; + return this.currentState; } } @@ -1451,7 +1463,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } } - private static class ContainerFinishedFinalStateSavedTransition extends + private static class AMContainerCrashedAtRunningTransition extends BaseTransition { @Override public void diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java index ace4435..259d68b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java @@ -33,5 +33,7 @@ RELEASED, // Source: ContainerAllocationExpirer - EXPIRE + EXPIRE, + + RECOVER } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 2921891..9c7322e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -65,6 +66,9 @@ RMContainerEventType.KILL) .addTransition(RMContainerState.NEW, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) + .addTransition(RMContainerState.NEW, + EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED), + RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, @@ -341,6 +345,29 @@ public void transition(RMContainerImpl cont, RMContainerEvent event) { } } + private static final class ContainerRecoveredTransition + implements + MultipleArcTransition { + @Override + public RMContainerState transition(RMContainerImpl container, + RMContainerEvent event) { + ContainerStatus containerStatus = ((RMContainerRecoverEvent) event).getContainerStatus(); + if (containerStatus.getState().equals(ContainerState.COMPLETE)) { + new FinishedTransition().transition(container, + new RMContainerFinishedEvent(container.containerId, containerStatus, + RMContainerEventType.FINISHED)); + return RMContainerState.COMPLETED; + } else if (containerStatus.getState() + .equals(ContainerState.RUNNING)) { + return RMContainerState.RUNNING; + } else { + LOG.warn("RMContainer received unexpected recover event with container" + + " state " + containerStatus.getState() + " while recovering."); + return RMContainerState.RUNNING; + } + } + } + private static final class ContainerReservedTransition extends BaseTransition { @@ -398,7 +425,6 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { // Inform AppAttempt container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( container.appAttemptId, finishedEvent.getRemoteContainerStatus())); - container.rmContext.getRMApplicationHistoryWriter() .containerFinished(container); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java new file mode 100644 index 0000000..f90723d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; + +public class RMContainerRecoverEvent extends RMContainerEvent { + + private final ContainerStatus containerStatus; + + public RMContainerRecoverEvent(ContainerId containerId, + ContainerStatus containerStatus) { + super(containerId, RMContainerEventType.RECOVER); + this.containerStatus = containerStatus; + } + + public ContainerStatus getContainerStatus() { + return containerStatus; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 0d33796..d5ed4a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -460,13 +460,9 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler + RMNodeStartEvent startEvent = (RMNodeStartEvent) event; + List containers = null; - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - String host = rmNode.nodeId.getHost(); if (rmNode.context.getInactiveRMNodes().containsKey(host)) { // Old node rejoining @@ -476,10 +472,17 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } else { // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); + containers = startEvent.getContainerStatuses(); } + + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(rmNode, containers)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } } - + public static class ReconnectNodeTransition implements SingleArcTransition { @@ -513,7 +516,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED)); + new RMNodeStartEvent(newNode.getNodeID(), null)); } rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java new file mode 100644 index 0000000..379666d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class RMNodeStartEvent extends RMNodeEvent { + + private List containerStatuses; + + public RMNodeStartEvent(NodeId nodeId, List containerStatuses) { + super(nodeId, RMNodeEventType.STARTED); + this.containerStatuses = containerStatuses; + } + + public List getContainerStatuses() { + return this.containerStatuses; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0f3af41..be05f95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -22,21 +22,46 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.Resources; +@SuppressWarnings("unchecked") public abstract class AbstractYarnScheduler implements ResourceScheduler { + private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); + protected RMContext rmContext; protected Map applications; + protected Map nodes = + new ConcurrentHashMap(); + + // whole capacity of the cluster + protected Resource clusterResource = Resource.newInstance(0, 0); + protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); protected static final Allocation EMPTY_ALLOCATION = new Allocation( @@ -74,4 +99,99 @@ public String moveApplication(ApplicationId appId, String newQueue) throw new YarnException(getClass().getSimpleName() + " does not support moving apps between queues"); } + + public void recoverContainersOnNode(List containerStatuses, + RMNode nm) { + if (containerStatuses == null + || (containerStatuses != null && containerStatuses.isEmpty())) { + return; + } + + for (ContainerStatus status : containerStatuses) { + ApplicationId appId = + status.getContainerId().getApplicationAttemptId().getApplicationId(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.error("Skip recovering container " + status + + " for unknown application."); + continue; + } + + if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip recovering container " + status + + " for unmanaged AM." + rmApp.getApplicationId()); + } + continue; + } + + SchedulerApplication schedulerApp = applications.get(appId); + if (schedulerApp == null) { + LOG.info("Skip recovering container " + status + + " for unknown SchedulerApplication. Application state is " + + rmApp.getState()); + continue; + } + + LOG.info("Recovering container " + status); + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + + // create container + RMContainer rmContainer = createContainer(status, nm); + + // recover RMContainer + rmContainer.handle(new RMContainerRecoverEvent(status.getContainerId(), + status)); + + // recover scheduler node + nodes.get(nm.getNodeID()).recoverContainer(rmContainer); + + // recover queue: update headroom etc. + Queue queue = schedulerAttempt.getQueue(); + queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); + + // recover scheduler attempt + schedulerAttempt.recoverContainer(rmContainer); + + // recover app scheduling info + schedulerAttempt.appSchedulingInfo.recoverContainer(rmContainer); + } + } + + public RMContainer createContainer(ContainerStatus status, RMNode node) { + Container container = + Container.newInstance(status.getContainerId(), node.getNodeID(), + node.getHttpAddress(), Resource.newInstance(1024, 1), + Priority.newInstance(0), null); + ApplicationAttemptId attemptId = + container.getId().getApplicationAttemptId(); + RMContainer rmContainer = + new RMContainerImpl(container, attemptId, node.getNodeID(), + applications.get(attemptId.getApplicationId()).getUser(), rmContext); + return rmContainer; + } + + public SchedulerNode getSchedulerNode(NodeId nodeId) { + return nodes.get(nodeId); + } + + protected void notifyAppAccepted(ApplicationId appId) { + // No need to re-send app_accepted event to recovered apps. + if (!rmContext.getRMApps().get(appId).getState() + .equals(RMAppState.ACCEPTED)) { + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(appId, RMAppEventType.APP_ACCEPTED)); + } + } + + protected void notifyAttemptAdded(ApplicationAttemptId attemptId) { + // If attempt is already at launched state, that means this is a recovered + // attempt. So no need to re-send attempt_added event to recovered attempts. + if (!rmContext.getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getState().equals(RMAppAttemptState.LAUNCHED)) { + rmContext.getDispatcher().getEventHandler() .handle( + new RMAppAttemptEvent(attemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index de71f71..f0f7df6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -409,4 +411,28 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo( // this.requests = appInfo.getRequests(); this.blacklist = appInfo.getBlackList(); } + + public synchronized void recoverContainer(RMContainer rmContainer) { + int containerId = rmContainer.getContainerId().getId(); + // set the containerId counter to be the max of all the Ids of recovered + // containers. + if (containerId >= containerIdCounter.get()) { + containerIdCounter.set(containerId); + } + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + + QueueMetrics metrics = queue.getMetrics(); + if (pending) { + // If there was any running containers, the application was + // running from scheduler's POV. + pending = false; + metrics.runAppAttempt(applicationId, user); + } + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + metrics.allocateResources(user, 1, Resource.newInstance(1024, 1), false); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index c51f819..f6fb59a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @Evolving @LimitedPrivate("yarn") @@ -60,4 +62,13 @@ boolean hasAccess(QueueACL acl, UserGroupInformation user); public ActiveUsersManager getActiveUsersManager(); + + /** + * Recover the state of the queue + * @param clusterResource the resource of the cluster + * @param application the application for which the container was allocated + * @param container the container that was recovered. + */ + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index fc7e047..7038705 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; @@ -499,5 +500,22 @@ public synchronized void move(Queue newQueue) { appSchedulingInfo.move(newQueue); this.queue = newQueue; - } + } + + public synchronized void recoverContainer(RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + LOG.info("SchedulerAttempt " + getApplicationAttemptId() + + " is recovering container " + rmContainer.getContainerId()); + liveContainers.put(rmContainer.getContainerId(), rmContainer); + Resources.addTo(currentConsumption, rmContainer.getContainer() + .getResource()); + // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource + // is called. + +// newlyAllocatedContainers.add(rmContainer); +// schedulingOpportunities +// lastScheduledContainer + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 524b1ab..e1416a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -20,9 +20,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. @@ -86,4 +88,7 @@ */ public abstract NodeId getNodeID(); + public abstract void recoverContainer(RMContainer rmContainer); + + public abstract boolean isValidContainer(ContainerId containerId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index f5090ba..ccb71e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -28,7 +28,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; @@ -235,15 +234,6 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) public ActiveUsersManager getActiveUsersManager(); /** - * Recover the state of the queue - * @param clusterResource the resource of the cluster - * @param application the application for which the container was allocated - * @param container the container that was recovered. - */ - public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, - Container container); - - /** * Adds all applications in the queue and its subqueues to the given collection. * @param apps the collection to add the applications to */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e28c18c..7536295 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -77,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -182,11 +182,6 @@ public Configuration getConf() { private Map queues = new ConcurrentHashMap(); - private Map nodes = - new ConcurrentHashMap(); - - private Resource clusterResource = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); private int numNodeManagers = 0; private Resource minimumAllocation; @@ -331,16 +326,16 @@ long getAsyncScheduleInterval() { static void schedule(CapacityScheduler cs) { // First randomize the start point int current = 0; - Collection nodes = cs.getAllNodes().values(); + Collection nodes = cs.getAllNodes().values(); int start = random.nextInt(nodes.size()); - for (FiCaSchedulerNode node : nodes) { + for (SchedulerNode node : nodes) { if (current++ >= start) { - cs.allocateContainersToNode(node); + cs.allocateContainersToNode((FiCaSchedulerNode)node); } } // Now, just get everyone to be safe - for (FiCaSchedulerNode node : nodes) { - cs.allocateContainersToNode(node); + for (SchedulerNode node : nodes) { + cs.allocateContainersToNode((FiCaSchedulerNode)node); } try { Thread.sleep(cs.getAsyncScheduleInterval()); @@ -541,8 +536,8 @@ private synchronized void addApplication(ApplicationId applicationId, applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + + notifyAppAccepted(applicationId); } private synchronized void addApplicationAttempt( @@ -565,9 +560,8 @@ private synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - rmContext.getDispatcher().getEventHandler() .handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + + notifyAttemptAdded(applicationAttemptId); } private synchronized void doneApplication(ApplicationId applicationId, @@ -861,6 +855,8 @@ public void handle(SchedulerEvent event) { { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerStatuses(), + nodeAddedEvent.getAddedRMNode()); } break; case NODE_REMOVED: @@ -942,7 +938,7 @@ private synchronized void addNode(RMNode nodeManager) { } private synchronized void removeNode(RMNode nodeInfo) { - FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); + FiCaSchedulerNode node = (FiCaSchedulerNode)this.nodes.get(nodeInfo.getNodeID()); if (node == null) { return; } @@ -1041,11 +1037,11 @@ public ApplicationResourceUsageReport getAppResourceUsageReport( @Lock(Lock.NoLock.class) FiCaSchedulerNode getNode(NodeId nodeId) { - return nodes.get(nodeId); + return (FiCaSchedulerNode)nodes.get(nodeId); } @Lock(Lock.NoLock.class) - Map getAllNodes() { + Map getAllNodes() { return nodes; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 971edb8..2a474d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @Private @Unstable public class LeafQueue implements CSQueue { @@ -564,7 +566,8 @@ public String toString() { "numContainers=" + getNumContainers(); } - private synchronized User getUser(String userName) { + @VisibleForTesting + public synchronized User getUser(String userName) { User user = users.get(userName); if (user == null) { user = new User(); @@ -1446,7 +1449,7 @@ public void completedContainer(Resource clusterResource, } synchronized void allocateResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource) { + SchedulerApplicationAttempt application, Resource resource) { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( @@ -1530,7 +1533,8 @@ public QueueMetrics getMetrics() { return metrics; } - static class User { + @VisibleForTesting + public static class User { Resource consumed = Resources.createResource(0, 0); int pendingApplications = 0; int activeApplications = 0; @@ -1580,13 +1584,16 @@ public synchronized void releaseContainer(Resource resource) { @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, application, container.getResource()); + allocateResource(clusterResource, attempt, rmContainer.getContainer() + .getResource()); } - getParent().recoverContainer(clusterResource, application, container); - + getParent().recoverContainer(clusterResource, attempt, rmContainer); } /** @@ -1613,5 +1620,4 @@ public void collectSchedulerApplications( apps.add(app.getApplicationAttemptId()); } } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 1f09475..19f550c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -38,7 +38,6 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -770,13 +771,16 @@ public QueueMetrics getMetrics() { @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, container.getResource()); + allocateResource(clusterResource,rmContainer.getContainer().getResource()); } if (parent != null) { - parent.recoverContainer(clusterResource, application, container); + parent.recoverContainer(clusterResource, attempt, rmContainer); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 58f12d0..8041283 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -35,11 +35,14 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); @@ -135,8 +138,10 @@ public Resource getTotalResource() { return this.totalResourceCapability; } - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) + @Override + @VisibleForTesting + public synchronized boolean isValidContainer(ContainerId containerId) { + if (launchedContainers.containsKey(containerId)) return true; return false; } @@ -151,7 +156,7 @@ private synchronized void updateResource(Container container) { * @param container container to be released */ public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { + if (!isValidContainer(container.getId())) { LOG.error("Invalid container released " + container); return; } @@ -282,4 +287,11 @@ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { Resources.addTo(this.availableResource, deltaResource); } + @Override + public void recoverContainer(RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + allocateContainer(null, rmContainer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java index c487f48..825ca78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java @@ -18,19 +18,34 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class NodeAddedSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; + private final List containerStatuses; public NodeAddedSchedulerEvent(RMNode rmNode) { super(SchedulerEventType.NODE_ADDED); this.rmNode = rmNode; + this.containerStatuses = null; + } + + public NodeAddedSchedulerEvent(RMNode rmNode, + List containerStatuses) { + super(SchedulerEventType.NODE_ADDED); + this.rmNode = rmNode; + this.containerStatuses = containerStatuses; } public RMNode getAddedRMNode() { return rmNode; } + public List getContainerStatuses() { + return containerStatuses; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index e842a6a..b396b07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -33,8 +33,10 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -254,4 +256,11 @@ public int getNumRunnableApps() { public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 427cb86..67e5104 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -33,7 +33,9 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @Private @Unstable @@ -200,4 +202,11 @@ public ActiveUsersManager getActiveUsersManager() { // Should never be called since all applications are submitted to LeafQueues return null; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 97ea6d4..6185f56 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -133,8 +133,8 @@ public synchronized Resource getUsedResource() { return usedResource; } - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) { + public synchronized boolean isValidContainer(ContainerId containerId) { + if (launchedContainers.containsKey(containerId)) { return true; } return false; @@ -150,7 +150,7 @@ private synchronized void updateResource(Container container) { * @param container container to be released */ public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { + if (!isValidContainer(container.getId())) { LOG.error("Invalid container released " + container); return; } @@ -275,5 +275,11 @@ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { // we can only adjust available resource if total resource is changed. Resources.addTo(this.availableResource, deltaResource); } + + @Override + public void recoverContainer(RMContainer rmContainer) { + // TODO Auto-generated method stub + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 82000e1..9bbc332 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -114,8 +115,6 @@ Configuration conf; - protected Map nodes = new ConcurrentHashMap(); - private boolean initialized; private Resource minimumAllocation; private Resource maximumAllocation; @@ -185,6 +184,17 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + increaseUsedResources(rmContainer); + updateAppHeadRoom(schedulerAttempt); + updateAvailableResourcesMetrics(); + } }; @Override @@ -357,7 +367,7 @@ public ApplicationResourceUsageReport getAppResourceUsageReport( } private FiCaSchedulerNode getNode(NodeId nodeId) { - return nodes.get(nodeId); + return (FiCaSchedulerNode)nodes.get(nodeId); } private synchronized void addApplication(ApplicationId applicationId, @@ -368,8 +378,8 @@ private synchronized void addApplication(ApplicationId applicationId, metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + + notifyAppAccepted(applicationId); } private synchronized void @@ -392,9 +402,8 @@ private synchronized void addApplication(ApplicationId applicationId, metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + + notifyAttemptAdded(appAttemptId); } private synchronized void doneApplication(ApplicationId applicationId, @@ -497,7 +506,7 @@ private void assignContainers(FiCaSchedulerNode node) { for (SchedulerApplication application : applications.values()) { FiCaSchedulerApp attempt = (FiCaSchedulerApp) application.getCurrentAppAttempt(); - attempt.setHeadroom(Resources.subtract(clusterResource, usedResource)); + updateAppHeadRoom(attempt); } } @@ -672,7 +681,7 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application rmContainer); // Update usage for this container - Resources.addTo(usedResource, capability); + increaseUsedResources(rmContainer); } } @@ -716,9 +725,22 @@ private synchronized void nodeUpdate(RMNode rmNode) { LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " + node.getAvailableResource()); } - - metrics.setAvailableResourcesToQueue( - Resources.subtract(clusterResource, usedResource)); + + updateAvailableResourcesMetrics(); + } + + private void increaseUsedResources(RMContainer rmContainer) { + Resources.addTo(usedResource, rmContainer.getAllocatedResource()); + } + + private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) { + schedulerAttempt.setHeadroom(Resources.subtract(clusterResource, + usedResource)); + } + + private void updateAvailableResourcesMetrics() { + metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource, + usedResource)); } @Override @@ -728,6 +750,9 @@ public void handle(SchedulerEvent event) { { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + } break; case NODE_REMOVED: @@ -856,7 +881,6 @@ private synchronized void containerCompleted(RMContainer rmContainer, } - private Resource clusterResource = recordFactory.newRecordInstance(Resource.class); private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private synchronized void removeNode(RMNode nodeInfo) { @@ -950,4 +974,8 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, } } + public Resource getUsedResource() { + return usedResource; + } + } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index ce5dd96..d05bd87 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -54,6 +57,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.Task.State; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -170,6 +176,15 @@ public synchronized void submit() throws IOException, YarnException { AppAddedSchedulerEvent addAppEvent = new AppAddedSchedulerEvent(this.applicationId, this.queue, "user"); scheduler.handle(addAppEvent); + + // fake the attempt to avoid NPE in test. + RMAppAttempt attempt = mock(RMAppAttempt.class); + when(attempt.getState()).thenReturn(RMAppAttemptState.SUBMITTED); + RMApp spyApp = + spy(resourceManager.getRMContext().getRMApps().get(applicationId)); + when(spyApp.getRMAppAttempt(applicationAttemptId)).thenReturn(attempt); + resourceManager.getRMContext().getRMApps().put(applicationId, spyApp); + AppAttemptAddedSchedulerEvent addAttemptEvent = new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false); scheduler.handle(addAttemptEvent); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index cfd05f9..d28043d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -23,8 +23,6 @@ import java.util.ArrayList; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -34,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; @@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; public class MockAM { @@ -140,6 +140,24 @@ public AllocateResponse allocate( return allocate(reqs, releases); } + public List allocateContainers(int numContainers, + int memory, MockNM nm) throws Exception { + allocate(nm.getNodeId().getHost(), memory, numContainers, + new ArrayList()); + List containers = new ArrayList(); + int timeoutSecs = 0; + do { + nm.nodeHeartbeat(true); + containers.addAll(allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + System.out.println("Attempt " + this.attemptId + " is waiting for " + + numContainers + " containers to be allocated. Currently has " + + containers.size() + " containers allocated."); + } while (containers.size() != numContainers && timeoutSecs++ < 60); + return containers; + } + public List createReq(String[] hosts, int memory, int priority, int containers) throws Exception { List reqs = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 1dcac06..f0002dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -185,4 +185,11 @@ public NodeHeartbeatResponse nodeHeartbeat(Map containers = attempt.getJustFinishedContainers(); + System.out.println("Received completed containers " + containers); + if (containers.contains(completedContainer)) { + break; + } + Thread.sleep(200); + } + } + public void waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState) throws Exception { RMContainer container = getResourceScheduler().getRMContainer(containerId); int timeoutSecs = 0; - while(container == null && timeoutSecs++ < 20) { + while(container == null && timeoutSecs++ < 100) { nm.nodeHeartbeat(true); container = getResourceScheduler().getRMContainer(containerId); System.out.println("Waiting for container " + containerId + " to be allocated."); @@ -333,7 +348,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(nm.getNodeId(), null)); } public void sendNodeLost(MockNM nm) throws Exception { @@ -542,4 +557,11 @@ public ApplicationReport getApplicationReport(ApplicationId appId) .newInstance(appId)); return response.getApplicationReport(); } + + // Explicitly reset queue metrics for testing. + @SuppressWarnings("static-access") + public void clearQueueMetrics(RMApp app) { + ((AbstractYarnScheduler) getResourceScheduler()).getSchedulerApplications() + .get(app.getApplicationId()).getQueue().getMetrics().clearQueueMetrics(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java index b80a6bc..36153de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java @@ -43,10 +43,11 @@ public class TestMoveApplication { private ResourceManager resourceManager = null; private static boolean failMove; - + private Configuration conf; + @Before public void setUp() throws Exception { - Configuration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class, FifoSchedulerWithMove.class); conf.set(YarnConfiguration.YARN_ADMIN_ACL, " "); @@ -119,28 +120,23 @@ public void testMoveTooLate() throws Exception { } } - @Test (timeout = 5000) - public void testMoveSuccessful() throws Exception { - // Submit application - Application application = new Application("user1", resourceManager); - ApplicationId appId = application.getApplicationId(); - application.submit(); - - // Wait for app to be accepted - RMApp app = resourceManager.rmContext.getRMApps().get(appId); - while (app.getState() != RMAppState.ACCEPTED) { - Thread.sleep(100); - } - - ClientRMService clientRMService = resourceManager.getClientRMService(); + @Test (timeout = 10000) + public + void testMoveSuccessful() throws Exception { + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app = rm1.submitApp(1024); + ClientRMService clientRMService = rm1.getClientRMService(); // FIFO scheduler does not support moves - clientRMService.moveApplicationAcrossQueues( - MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue")); - - RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId); + clientRMService + .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest + .newInstance(app.getApplicationId(), "newqueue")); + + RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId()); assertEquals("newqueue", rmApp.getQueue()); + rm1.stop(); } - + @Test public void testMoveRejectedByPermissions() throws Exception { failMove = true; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 3bcde8d..6a5ffa9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -21,15 +21,14 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -160,7 +161,7 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent() { @Test (timeout = 5000) public void testExpiredContainer() { // Start the node - node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(null, null)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -188,11 +189,11 @@ public void testExpiredContainer() { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(null, null)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node2.handle(new RMNodeStartEvent(null, null)); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -248,7 +249,7 @@ public void testContainerUpdate() throws InterruptedException{ @Test (timeout = 5000) public void testStatusChange(){ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(null, null)); //Add info to the queue first node.setNextHeartBeat(false); @@ -464,7 +465,7 @@ private RMNodeImpl getRunningNode(String nmVersion) { RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(node.getNodeID(), null)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -495,7 +496,7 @@ public void testAdd() { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(node.getNodeID(), null)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java new file mode 100644 index 0000000..43d68c3 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -0,0 +1,344 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class TestWorkPreservingRMRestart { + + private YarnConfiguration conf; + private Class schedulerClass; + MockRM rm1 = null; + MockRM rm2 = null; + + @Before + public void setup() throws UnknownHostException { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + DefaultMetricsSystem.setMiniClusterMode(true); + } + + @After + public void tearDown() { + if (rm1 != null) { + rm1.stop(); + } + if (rm2 != null) { + rm2.stop(); + } + } + + @Parameterized.Parameters + public static Collection getTestParameters() { + return Arrays.asList(new Object[][] { { CapacityScheduler.class }, + { FifoScheduler.class } }); + } + + public TestWorkPreservingRMRestart(Class schedulerClass) { + this.schedulerClass = schedulerClass; + } + + // Test common scheduler state including SchedulerAttempt, SchedulerNode, + // AppSchedulingInfo can be reconstructed via the container recovery reports + // on NM re-registration. + // Also test scheduler specific changes: i.e. Queue state + // (CSQueue/FSQueue/FifoQueue) according to the underlying scheduler chosen. + // Test Strategy: send 3 container recovery reports(AMContainer, running + // container, completed container) on NM re-registration, check the states of + // SchedulerAttempt, SchedulerNode etc. are updated accordingly. + @Test(timeout = 20000) + public void testCommonSchedulerRecovery() throws Exception { + conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass, + ResourceScheduler.class); + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + + int containerMemory = 1024; + Resource containerResource = Resource.newInstance(containerMemory, 1); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // clear queue metrics + rm1.clearQueueMetrics(app1); + + // Re-start RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // recover app + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt(); + ContainerStatus amContainer = + createContainerRecoveryReport(am1, 1, ContainerState.RUNNING); + ContainerStatus runningContainer = + createContainerRecoveryReport(am1, 2, ContainerState.RUNNING); + ContainerStatus completedContainer = + createContainerRecoveryReport(am1, 3, ContainerState.COMPLETE); + + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer)); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(2000); + // check RMContainers are re-recreated and the container state is correct. + rm2.waitForState(nm1, amContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForState(nm1, runningContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForContainerToComplete(loadedAttempt1, completedContainer); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId()); + + // ********* check scheduler node state.******* + // 2 running containers. + Resource usedResources = Resources.multiply(containerResource, 2); + Resource nmResource = + Resource.newInstance(nm1.getMemory(), nm1.getvCores()); + + assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidContainer(runningContainer + .getContainerId())); + assertFalse(schedulerNode1.isValidContainer(completedContainer + .getContainerId())); + // 2 launched containers, 1 completed container + assertEquals(2, schedulerNode1.getNumContainers()); + + assertEquals(Resources.subtract(nmResource, usedResources), + schedulerNode1.getAvailableResource()); + assertEquals(usedResources, schedulerNode1.getUsedResource()); + Resource availableResources = Resources.subtract(nmResource, usedResources); + + // ***** check queue state according to the underlying scheduler ******** + Map schedulerApps = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication schedulerApp = + schedulerApps.get(recoveredApp1.getApplicationId()); + + if (schedulerClass.equals(CapacityScheduler.class)) { + checkCSQueue(schedulerApp, usedResources, availableResources, nmResource); + } else if (schedulerClass.equals(FifoScheduler.class)) { + checkFifoQueue(schedulerApp, usedResources, availableResources); + } + + // *********** check scheduler attempt state.******** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertTrue(schedulerAttempt.getLiveContainers().contains( + scheduler.getRMContainer(amContainer.getContainerId()))); + assertTrue(schedulerAttempt.getLiveContainers().contains( + scheduler.getRMContainer(runningContainer.getContainerId()))); + assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + + // *********** check appSchedulingInfo state *********** + assertEquals(4, schedulerAttempt.getNewContainerId()); + } + + private void checkCSQueue(SchedulerApplication schedulerApp, + Resource usedResources, Resource availableResources, Resource nmResource) + throws Exception { + LeafQueue queue = (LeafQueue) schedulerApp.getQueue(); + assertEquals(usedResources, queue.getUsedResources()); + assertEquals(2, queue.getNumContainers()); + + ResourceCalculator calc = + ((CapacityScheduler) rm2.getResourceScheduler()) + .getResourceCalculator(); + float usedCapacity = + Resources.divide(calc, nmResource, usedResources, nmResource); + assertEquals(usedCapacity, queue.getUsedCapacity(), 0); + assertEquals(usedCapacity, queue.getAbsoluteUsedCapacity(), 0); + assertEquals(usedResources, queue.getUser(schedulerApp.getUser()) + .getConsumedResources()); + + // ************* check Queue metrics ************ + QueueMetrics queueMetrics = queue.getMetrics(); + asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResources.getMemory(), + usedResources.getVirtualCores()); + + // ************ check user metrics *********** + QueueMetrics userMetrics = + queueMetrics.getUserMetrics(schedulerApp.getUser()); + asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResources.getMemory(), + usedResources.getVirtualCores()); + } + + private void checkFifoQueue(SchedulerApplication schedulerApp, + Resource usedResources, Resource availableResources) throws Exception { + FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler(); + // ************ check cluster used Resources ******** + assertEquals(usedResources, scheduler.getUsedResource()); + + // ************ check app headroom **************** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + + // ************ check queue metrics **************** + QueueMetrics queueMetrics = scheduler.getRootQueueMetrics(); + asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResources.getMemory(), + usedResources.getVirtualCores()); + } + + // Test RM shuts down, in the meanwhile, AM fails. Restarted RM scheduler + // should not contain the containers that belong to the failed AM. + @Test(timeout = 20000) + public void testAMfailedBetweenRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + ContainerStatus amContainer = + createContainerRecoveryReport(am1, 1, ContainerState.COMPLETE); + ContainerStatus runningContainer = + createContainerRecoveryReport(am1, 2, ContainerState.RUNNING); + ContainerStatus completedContainer = + createContainerRecoveryReport(am1, 3, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer)); + rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + // Wait for RM to settle down on recovering containers; + Thread.sleep(2000); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // Previous AM failed, The failed AM should once again release the + // just-recovered containers. + assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); + assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); + } + + // Apps already completed before RM restart. Restarted RM scheduler should not + // recover containers for completed apps. + @Test(timeout = 20000) + public void testContainersNotRecoveredForCompletedApps() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + ContainerStatus runningContainer = + createContainerRecoveryReport(am1, 2, ContainerState.RUNNING); + ContainerStatus completedContainer = + createContainerRecoveryReport(am1, 3, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(runningContainer, completedContainer)); + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + assertEquals(RMAppState.FINISHED, recoveredApp1.getState()); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(2000); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // scheduler should not recover containers for finished apps. + assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); + assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); + } + + private ContainerStatus createContainerRecoveryReport(MockAM am, int id, + ContainerState containerState) { + ContainerId containerId = + ContainerId.newInstance(am.getApplicationAttemptId(), id); + ContainerStatus containerReport = + ContainerStatus.newInstance(containerId, containerState, + "recover container", 0); + return containerReport; + } + + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, + int appsPending, int appsRunning, int appsCompleted, + int allocatedContainers, int availableMB, int availableVirtualCores, + int allocatedMB, int allocatedVirtualCores) { + assertEquals(appsSubmitted, qm.getAppsSubmitted()); + assertEquals(appsPending, qm.getAppsPending()); + assertEquals(appsRunning, qm.getAppsRunning()); + assertEquals(appsCompleted, qm.getAppsCompleted()); + assertEquals(allocatedContainers, qm.getAllocatedContainers()); + assertEquals(availableMB, qm.getAvailableMB()); + assertEquals(availableVirtualCores, qm.getAvailableVirtualCores()); + assertEquals(allocatedMB, qm.getAllocatedMB()); + assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 8fcbf54..c91cfad 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.net.InetSocketAddress; import java.security.PrivilegedAction; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -386,9 +388,13 @@ public void testCreatePreemptedContainerStatus() { public static SchedulerApplication verifyAppAddedAndRemovedFromScheduler( final Map applications, - EventHandler handler, String queueName) throws Exception { + EventHandler handler, String queueName, + RMContext rmContext) throws Exception { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + RMApp rmApp = mock(RMApp.class); + when(rmApp.getState()).thenReturn(RMAppState.SUBMITTED); + rmContext.getRMApps().put(appId, rmApp); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(appId, queueName, "user"); handler.handle(appAddedEvent); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d0ba334..c1be757 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -55,6 +55,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -555,6 +559,15 @@ public void testBlackListNodes() throws Exception { ApplicationId appId = BuilderUtils.newApplicationId(100, 1); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( appId, 1); + + // fake app/attempt to avoid NPE + RMApp rmApp = mock(RMApp.class); + when(rmApp.getState()).thenReturn(RMAppState.SUBMITTED); + RMAppAttempt attempt = mock(RMAppAttempt.class); + when(attempt.getState()).thenReturn(RMAppAttemptState.SUBMITTED); + when(rmApp.getRMAppAttempt(appAttemptId)).thenReturn(attempt); + rm.getRMContext().getRMApps().put(appId, rmApp); + SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, "default", "user"); cs.handle(addAppEvent); @@ -632,14 +645,16 @@ public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { CapacityScheduler cs = new CapacityScheduler(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); - cs.reinitialize(conf, new RMContextImpl(rmDispatcher, null, null, null, - null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + RMContext rmContext = + new RMContextImpl(rmDispatcher, null, null, null, null, null, + new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); + cs.reinitialize(conf, rmContext); SchedulerApplication app = TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - cs.getSchedulerApplications(), cs, "a1"); + cs.getSchedulerApplications(), cs, "a1", rmContext); Assert.assertEquals("a1", app.getQueue().getQueueName()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index bdf89bb..08aa4df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -58,7 +58,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -351,6 +353,17 @@ public void testAppAttemptMetrics() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 1); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null, + rmContext); + + // fake app/attempt to avoid NPE. + RMApp rmApp = mock(RMApp.class); + when(rmApp.getState()).thenReturn(RMAppState.SUBMITTED); + RMAppAttempt attempt = mock(RMAppAttempt.class); + when(attempt.getState()).thenReturn(RMAppAttemptState.SUBMITTED); + when(rmApp.getRMAppAttempt(appAttemptId_0)).thenReturn(attempt); + rmContext.getRMApps().put(appAttemptId_0.getApplicationId(), rmApp); + AppAddedSchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appAttemptId_0.getApplicationId(), a.getQueueName(), user_0); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2524763..b907c2e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2650,7 +2650,8 @@ public void testAddAndRemoveAppFromFairScheduler() throws Exception { FairScheduler scheduler = (FairScheduler) resourceManager.getResourceScheduler(); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - scheduler.getSchedulerApplications(), scheduler, "default"); + scheduler.getSchedulerApplications(), scheduler, "default", + resourceManager.getRMContext()); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index f6dfc3f..2e842be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -252,7 +253,7 @@ public void testUpdateResourceOnNode() throws Exception { FifoScheduler scheduler = new FifoScheduler(){ @SuppressWarnings("unused") - public Map getNodes(){ + public Map getNodes(){ return nodes; } }; @@ -598,7 +599,7 @@ public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { MockRM rm = new MockRM(conf); FifoScheduler fs = (FifoScheduler)rm.getResourceScheduler(); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - fs.getSchedulerApplications(), fs, "queue"); + fs.getSchedulerApplications(), fs, "queue", rm.getRMContext()); } private void checkApplicationResourceUsage(int expected,