diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0a11948..6298e3a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -318,6 +318,12 @@ public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; + public static final String RM_WORK_PRESERVING_RECOVERY_ENABLED = RM_PREFIX + + "work-preserving.recovery.enabled"; + public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED = + false; + + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 1d40320..b5d26ab 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -32,7 +32,6 @@ import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -240,13 +240,6 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); - if (!request.getContainerStatuses().isEmpty()) { - LOG.info("received container statuses on node manager register :" - + request.getContainerStatuses()); - for (ContainerStatus containerStatus : request.getContainerStatuses()) { - handleContainerStatus(containerStatus); - } - } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -305,7 +298,7 @@ public RegisterNodeManagerResponse registerNodeManager( RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.STARTED)); + new RMNodeStartEvent(nodeId, request.getContainerStatuses())); } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index f4f2e20..523be7f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -199,12 +200,6 @@ new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, RMAppEventType.KILL, new KillAttemptTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -763,6 +758,11 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return RMAppState.SUBMITTED; } + // Add the current attempt to the scheduler.It'll be removed from + // scheduler in RMAppAttempt#BaseFinalTransition + app.handler.handle(new AppAttemptAddedSchedulerEvent(app.currentAttempt + .getAppAttemptId(), false)); + // YARN-1507 is saving the application state after the application is // accepted. So after YARN-1507, an app is saved meaning it is accepted. // Thus we return ACCECPTED state on recovery. @@ -1055,8 +1055,12 @@ public void transition(RMAppImpl app, RMAppEvent event) { if (app.finishTime == 0 ) { app.finishTime = System.currentTimeMillis(); } - app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + // Recovered apps that were completed were not added to scheduler, so no + // need to remove them from scheduler. + if (app.recoveredFinalState == null) { + app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, + finalState)); + } app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b4bad12..e0a5bfa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -204,4 +204,5 @@ */ ApplicationAttemptReport createApplicationAttemptReport(); + boolean isLastAttempt(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index efe0721..04169cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -267,15 +267,17 @@ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.CONTAINER_FINISHED, new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED)) // Transitions from LAUNCHED State .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING, RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition()) - .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, + .addTransition(RMAppAttemptState.LAUNCHED, + EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new ContainerFinishedTransition( + new AMContainerCrashedBeforeRunningTransition(), + RMAppAttemptState.LAUNCHED)) .addTransition( RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -302,7 +304,9 @@ RMAppAttemptState.RUNNING, EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new ContainerFinishedTransition()) + new ContainerFinishedTransition( + new AMContainerCrashedAtRunningTransition(), + RMAppAttemptState.RUNNING)) .addTransition( RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -1207,17 +1211,16 @@ public void transition(RMAppAttemptImpl appAttempt, } } - private static final class AMContainerCrashedTransition extends + private static final class AMContainerCrashedBeforeRunningTransition extends BaseFinalTransition { - public AMContainerCrashedTransition() { + public AMContainerCrashedBeforeRunningTransition() { super(RMAppAttemptState.FAILED); } @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptContainerFinishedEvent finishEvent = ((RMAppAttemptContainerFinishedEvent)event); @@ -1410,6 +1413,16 @@ public void transition(RMAppAttemptImpl appAttempt, implements MultipleArcTransition { + // The transition To Do after attempt final state is saved. + private BaseTransition transitionToDo; + private RMAppAttemptState currentState; + + public ContainerFinishedTransition(BaseTransition transitionToDo, + RMAppAttemptState currentState) { + this.transitionToDo = transitionToDo; + this.currentState = currentState; + } + @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { @@ -1426,14 +1439,13 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, containerStatus.getContainerId())) { // Remember the follow up transition and save the final attempt state. appAttempt.rememberTargetTransitionsAndStoreState(event, - new ContainerFinishedFinalStateSavedTransition(), - RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); + transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); return RMAppAttemptState.FINAL_SAVING; } // Normal container.Put it in completedcontainers list appAttempt.justFinishedContainers.add(containerStatus); - return RMAppAttemptState.RUNNING; + return this.currentState; } } @@ -1451,7 +1463,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } } - private static class ContainerFinishedFinalStateSavedTransition extends + private static class AMContainerCrashedAtRunningTransition extends BaseTransition { @Override public void @@ -1663,4 +1675,9 @@ public ApplicationAttemptReport createApplicationAttemptReport() { } return attemptReport; } + + @Override + public boolean isLastAttempt() { + return this.isLastAttempt; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java index ace4435..259d68b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java @@ -33,5 +33,7 @@ RELEASED, // Source: ContainerAllocationExpirer - EXPIRE + EXPIRE, + + RECOVER } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 2921891..9c7322e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -65,6 +66,9 @@ RMContainerEventType.KILL) .addTransition(RMContainerState.NEW, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) + .addTransition(RMContainerState.NEW, + EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED), + RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, @@ -341,6 +345,29 @@ public void transition(RMContainerImpl cont, RMContainerEvent event) { } } + private static final class ContainerRecoveredTransition + implements + MultipleArcTransition { + @Override + public RMContainerState transition(RMContainerImpl container, + RMContainerEvent event) { + ContainerStatus containerStatus = ((RMContainerRecoverEvent) event).getContainerStatus(); + if (containerStatus.getState().equals(ContainerState.COMPLETE)) { + new FinishedTransition().transition(container, + new RMContainerFinishedEvent(container.containerId, containerStatus, + RMContainerEventType.FINISHED)); + return RMContainerState.COMPLETED; + } else if (containerStatus.getState() + .equals(ContainerState.RUNNING)) { + return RMContainerState.RUNNING; + } else { + LOG.warn("RMContainer received unexpected recover event with container" + + " state " + containerStatus.getState() + " while recovering."); + return RMContainerState.RUNNING; + } + } + } + private static final class ContainerReservedTransition extends BaseTransition { @@ -398,7 +425,6 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { // Inform AppAttempt container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( container.appAttemptId, finishedEvent.getRemoteContainerStatus())); - container.rmContext.getRMApplicationHistoryWriter() .containerFinished(container); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java new file mode 100644 index 0000000..f90723d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; + +public class RMContainerRecoverEvent extends RMContainerEvent { + + private final ContainerStatus containerStatus; + + public RMContainerRecoverEvent(ContainerId containerId, + ContainerStatus containerStatus) { + super(containerId, RMContainerEventType.RECOVER); + this.containerStatus = containerStatus; + } + + public ContainerStatus getContainerStatus() { + return containerStatus; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index dc53a5d..485c3be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -460,13 +460,9 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler + RMNodeStartEvent startEvent = (RMNodeStartEvent) event; + List containers = null; - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - String host = rmNode.nodeId.getHost(); if (rmNode.context.getInactiveRMNodes().containsKey(host)) { // Old node rejoining @@ -476,10 +472,17 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } else { // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); + containers = startEvent.getContainerStatuses(); } + + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(rmNode, containers)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } } - + public static class ReconnectNodeTransition implements SingleArcTransition { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java new file mode 100644 index 0000000..379666d --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class RMNodeStartEvent extends RMNodeEvent { + + private List containerStatuses; + + public RMNodeStartEvent(NodeId nodeId, List containerStatuses) { + super(nodeId, RMNodeEventType.STARTED); + this.containerStatuses = containerStatuses; + } + + public List getContainerStatuses() { + return this.containerStatuses; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 0f3af41..29e38be 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -22,21 +22,39 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.Resources; public abstract class AbstractYarnScheduler implements ResourceScheduler { + private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); + protected RMContext rmContext; protected Map applications; + protected Map nodes = + new ConcurrentHashMap(); + + // whole capacity of the cluster + protected Resource clusterResource = Resource.newInstance(0, 0); + protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); protected static final Allocation EMPTY_ALLOCATION = new Allocation( @@ -74,4 +92,80 @@ public String moveApplication(ApplicationId appId, String newQueue) throw new YarnException(getClass().getSimpleName() + " does not support moving apps between queues"); } + + public void recoverContainersOnNode(List containerStatuses, + RMNode nm) { + if (containerStatuses == null + || (containerStatuses != null && containerStatuses.isEmpty())) { + return; + } + + for (ContainerStatus status : containerStatuses) { + ApplicationId appId = + status.getContainerId().getApplicationAttemptId().getApplicationId(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.error("Skip recovering container " + status + + " for unknown application."); + continue; + } + + if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip recovering container " + status + + " for unmanaged AM." + rmApp.getApplicationId()); + } + continue; + } + + SchedulerApplication schedulerApp = applications.get(appId); + if (schedulerApp == null) { + LOG.info("Skip recovering container " + status + + " for unknown SchedulerApplication. Application state is " + + rmApp.getState()); + continue; + } + + LOG.info("Recovering container " + status); + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + + // create container + RMContainer rmContainer = createContainer(status, nm); + + // recover RMContainer + rmContainer.handle(new RMContainerRecoverEvent(status.getContainerId(), + status)); + + // recover scheduler node + nodes.get(nm.getNodeID()).recoverContainer(rmContainer); + + // recover queue: update headroom etc. + Queue queue = schedulerAttempt.getQueue(); + queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); + + // recover scheduler attempt + schedulerAttempt.recoverContainer(rmContainer); + + // recover app scheduling info + schedulerAttempt.appSchedulingInfo.recoverContainer(rmContainer); + } + } + + public RMContainer createContainer(ContainerStatus status, RMNode node) { + Container container = + Container.newInstance(status.getContainerId(), node.getNodeID(), + node.getHttpAddress(), Resource.newInstance(1024, 1), + Priority.newInstance(0), null); + ApplicationAttemptId attemptId = + container.getId().getApplicationAttemptId(); + RMContainer rmContainer = + new RMContainerImpl(container, attemptId, node.getNodeID(), + applications.get(attemptId.getApplicationId()).getUser(), rmContext); + return rmContainer; + } + + public SchedulerNode getSchedulerNode(NodeId nodeId) { + return nodes.get(nodeId); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index de71f71..f0f7df6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -409,4 +411,28 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo( // this.requests = appInfo.getRequests(); this.blacklist = appInfo.getBlackList(); } + + public synchronized void recoverContainer(RMContainer rmContainer) { + int containerId = rmContainer.getContainerId().getId(); + // set the containerId counter to be the max of all the Ids of recovered + // containers. + if (containerId >= containerIdCounter.get()) { + containerIdCounter.set(containerId); + } + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + + QueueMetrics metrics = queue.getMetrics(); + if (pending) { + // If there was any running containers, the application was + // running from scheduler's POV. + pending = false; + metrics.runAppAttempt(applicationId, user); + } + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + metrics.allocateResources(user, 1, Resource.newInstance(1024, 1), false); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index c51f819..f6fb59a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @Evolving @LimitedPrivate("yarn") @@ -60,4 +62,13 @@ boolean hasAccess(QueueACL acl, UserGroupInformation user); public ActiveUsersManager getActiveUsersManager(); + + /** + * Recover the state of the queue + * @param clusterResource the resource of the cluster + * @param application the application for which the container was allocated + * @param container the container that was recovered. + */ + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index fc7e047..7038705 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; @@ -499,5 +500,22 @@ public synchronized void move(Queue newQueue) { appSchedulingInfo.move(newQueue); this.queue = newQueue; - } + } + + public synchronized void recoverContainer(RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + LOG.info("SchedulerAttempt " + getApplicationAttemptId() + + " is recovering container " + rmContainer.getContainerId()); + liveContainers.put(rmContainer.getContainerId(), rmContainer); + Resources.addTo(currentConsumption, rmContainer.getContainer() + .getResource()); + // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource + // is called. + +// newlyAllocatedContainers.add(rmContainer); +// schedulingOpportunities +// lastScheduledContainer + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 524b1ab..e1416a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -20,9 +20,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. @@ -86,4 +88,7 @@ */ public abstract NodeId getNodeID(); + public abstract void recoverContainer(RMContainer rmContainer); + + public abstract boolean isValidContainer(ContainerId containerId); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index f5090ba..ccb71e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -28,7 +28,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; @@ -235,15 +234,6 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) public ActiveUsersManager getActiveUsersManager(); /** - * Recover the state of the queue - * @param clusterResource the resource of the cluster - * @param application the application for which the container was allocated - * @param container the container that was recovered. - */ - public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, - Container container); - - /** * Adds all applications in the queue and its subqueues to the given collection. * @param apps the collection to add the applications to */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e28c18c..096de7d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -53,7 +53,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -77,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -182,11 +182,6 @@ public Configuration getConf() { private Map queues = new ConcurrentHashMap(); - private Map nodes = - new ConcurrentHashMap(); - - private Resource clusterResource = - RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); private int numNodeManagers = 0; private Resource minimumAllocation; @@ -331,16 +326,16 @@ long getAsyncScheduleInterval() { static void schedule(CapacityScheduler cs) { // First randomize the start point int current = 0; - Collection nodes = cs.getAllNodes().values(); + Collection nodes = cs.getAllNodes().values(); int start = random.nextInt(nodes.size()); - for (FiCaSchedulerNode node : nodes) { + for (SchedulerNode node : nodes) { if (current++ >= start) { - cs.allocateContainersToNode(node); + cs.allocateContainersToNode((FiCaSchedulerNode)node); } } // Now, just get everyone to be safe - for (FiCaSchedulerNode node : nodes) { - cs.allocateContainersToNode(node); + for (SchedulerNode node : nodes) { + cs.allocateContainersToNode((FiCaSchedulerNode)node); } try { Thread.sleep(cs.getAsyncScheduleInterval()); @@ -541,8 +536,12 @@ private synchronized void addApplication(ApplicationId applicationId, applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() + // No need to re-send app_accepted event to recovered apps. + if (!rmContext.getRMApps().get(applicationId).getState() + .equals(RMAppState.ACCEPTED)) { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } private synchronized void addApplicationAttempt( @@ -565,9 +564,16 @@ private synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); + + // If attempt is already at launched state, that means this is a recovered + // attempt. So no need to re-send attempt_added event to recovered attempts. + if (!rmContext.getRMApps().get(applicationAttemptId.getApplicationId()) + .getRMAppAttempt(applicationAttemptId).getState() + .equals(RMAppAttemptState.LAUNCHED)) { rmContext.getDispatcher().getEventHandler() .handle( new RMAppAttemptEvent(applicationAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); + } } private synchronized void doneApplication(ApplicationId applicationId, @@ -861,6 +867,8 @@ public void handle(SchedulerEvent event) { { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerStatuses(), + nodeAddedEvent.getAddedRMNode()); } break; case NODE_REMOVED: @@ -942,7 +950,7 @@ private synchronized void addNode(RMNode nodeManager) { } private synchronized void removeNode(RMNode nodeInfo) { - FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID()); + FiCaSchedulerNode node = (FiCaSchedulerNode)this.nodes.get(nodeInfo.getNodeID()); if (node == null) { return; } @@ -1041,11 +1049,11 @@ public ApplicationResourceUsageReport getAppResourceUsageReport( @Lock(Lock.NoLock.class) FiCaSchedulerNode getNode(NodeId nodeId) { - return nodes.get(nodeId); + return (FiCaSchedulerNode)nodes.get(nodeId); } @Lock(Lock.NoLock.class) - Map getAllNodes() { + Map getAllNodes() { return nodes; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 971edb8..2a474d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @Private @Unstable public class LeafQueue implements CSQueue { @@ -564,7 +566,8 @@ public String toString() { "numContainers=" + getNumContainers(); } - private synchronized User getUser(String userName) { + @VisibleForTesting + public synchronized User getUser(String userName) { User user = users.get(userName); if (user == null) { user = new User(); @@ -1446,7 +1449,7 @@ public void completedContainer(Resource clusterResource, } synchronized void allocateResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource) { + SchedulerApplicationAttempt application, Resource resource) { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( @@ -1530,7 +1533,8 @@ public QueueMetrics getMetrics() { return metrics; } - static class User { + @VisibleForTesting + public static class User { Resource consumed = Resources.createResource(0, 0); int pendingApplications = 0; int activeApplications = 0; @@ -1580,13 +1584,16 @@ public synchronized void releaseContainer(Resource resource) { @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, application, container.getResource()); + allocateResource(clusterResource, attempt, rmContainer.getContainer() + .getResource()); } - getParent().recoverContainer(clusterResource, application, container); - + getParent().recoverContainer(clusterResource, attempt, rmContainer); } /** @@ -1613,5 +1620,4 @@ public void collectSchedulerApplications( apps.add(app.getApplicationAttemptId()); } } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 1f09475..19f550c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -38,7 +38,6 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -770,13 +771,16 @@ public QueueMetrics getMetrics() { @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, container.getResource()); + allocateResource(clusterResource,rmContainer.getContainer().getResource()); } if (parent != null) { - parent.recoverContainer(clusterResource, application, container); + parent.recoverContainer(clusterResource, attempt, rmContainer); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 58f12d0..8041283 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -35,11 +35,14 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + public class FiCaSchedulerNode extends SchedulerNode { private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class); @@ -135,8 +138,10 @@ public Resource getTotalResource() { return this.totalResourceCapability; } - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) + @Override + @VisibleForTesting + public synchronized boolean isValidContainer(ContainerId containerId) { + if (launchedContainers.containsKey(containerId)) return true; return false; } @@ -151,7 +156,7 @@ private synchronized void updateResource(Container container) { * @param container container to be released */ public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { + if (!isValidContainer(container.getId())) { LOG.error("Invalid container released " + container); return; } @@ -282,4 +287,11 @@ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { Resources.addTo(this.availableResource, deltaResource); } + @Override + public void recoverContainer(RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + allocateContainer(null, rmContainer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java index c487f48..825ca78 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java @@ -18,19 +18,34 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class NodeAddedSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; + private final List containerStatuses; public NodeAddedSchedulerEvent(RMNode rmNode) { super(SchedulerEventType.NODE_ADDED); this.rmNode = rmNode; + this.containerStatuses = null; + } + + public NodeAddedSchedulerEvent(RMNode rmNode, + List containerStatuses) { + super(SchedulerEventType.NODE_ADDED); + this.rmNode = rmNode; + this.containerStatuses = containerStatuses; } public RMNode getAddedRMNode() { return rmNode; } + public List getContainerStatuses() { + return containerStatuses; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index e842a6a..b396b07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -33,8 +33,10 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -254,4 +256,11 @@ public int getNumRunnableApps() { public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 427cb86..67e5104 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -33,7 +33,9 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @Private @Unstable @@ -200,4 +202,11 @@ public ActiveUsersManager getActiveUsersManager() { // Should never be called since all applications are submitted to LeafQueues return null; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index 97ea6d4..82529ff 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer + .RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -133,8 +135,8 @@ public synchronized Resource getUsedResource() { return usedResource; } - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) { + public synchronized boolean isValidContainer(ContainerId containerId) { + if (launchedContainers.containsKey(containerId)) { return true; } return false; @@ -150,7 +152,7 @@ private synchronized void updateResource(Container container) { * @param container container to be released */ public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { + if (!isValidContainer(container.getId())) { LOG.error("Invalid container released " + container); return; } @@ -275,5 +277,12 @@ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) { // we can only adjust available resource if total resource is changed. Resources.addTo(this.availableResource, deltaResource); } - + + @Override + public void recoverContainer(RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + allocateContainer(null, rmContainer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fab9ebe..a0c2a19 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -152,10 +152,6 @@ // Time we last ran preemptTasksIfNecessary private long lastPreemptCheckTime; - // Nodes in the cluster, indexed by NodeId - private Map nodes = - new ConcurrentHashMap(); - // Aggregate capacity of the cluster private Resource clusterCapacity = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); @@ -809,7 +805,7 @@ private synchronized void completedContainer(RMContainer rmContainer, } // Get the node on which the container was allocated - FSSchedulerNode node = nodes.get(container.getNodeId()); + FSSchedulerNode node = (FSSchedulerNode) nodes.get(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { application.unreserve(node, rmContainer.getReservedPriority()); @@ -835,7 +831,7 @@ private synchronized void addNode(RMNode node) { } private synchronized void removeNode(RMNode rmNode) { - FSSchedulerNode node = nodes.get(rmNode.getNodeID()); + FSSchedulerNode node = (FSSchedulerNode) nodes.get(rmNode.getNodeID()); // This can occur when an UNHEALTHY node reconnects if (node == null) { return; @@ -964,7 +960,7 @@ private synchronized void nodeUpdate(RMNode nm) { LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); } eventLog.log("HEARTBEAT", nm.getHostName()); - FSSchedulerNode node = nodes.get(nm.getNodeID()); + FSSchedulerNode node = (FSSchedulerNode) nodes.get(nm.getNodeID()); // Update resource if any change SchedulerUtils.updateResourceIfChanged(node, nm, clusterCapacity, LOG); @@ -1012,7 +1008,7 @@ private void continuousScheduling() { // iterate all nodes for (NodeId nodeId : nodeIdList) { if (nodes.containsKey(nodeId)) { - FSSchedulerNode node = nodes.get(nodeId); + FSSchedulerNode node = (FSSchedulerNode) nodes.get(nodeId); try { if (Resources.fitsIn(minimumAllocation, node.getAvailableResource())) { @@ -1091,7 +1087,7 @@ private synchronized void attemptScheduling(FSSchedulerNode node) { @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { - FSSchedulerNode node = nodes.get(nodeId); + FSSchedulerNode node = (FSSchedulerNode) nodes.get(nodeId); return node == null ? null : new SchedulerNodeReport(node); } @@ -1156,6 +1152,8 @@ public void handle(SchedulerEvent event) { } NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerStatuses(), + nodeAddedEvent.getAddedRMNode()); break; case NODE_REMOVED: if (!(event instanceof NodeRemovedSchedulerEvent)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 82000e1..f73710c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; @@ -185,6 +186,13 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + + } }; @Override diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index cfd05f9..d28043d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -23,8 +23,6 @@ import java.util.ArrayList; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -34,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; @@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; public class MockAM { @@ -140,6 +140,24 @@ public AllocateResponse allocate( return allocate(reqs, releases); } + public List allocateContainers(int numContainers, + int memory, MockNM nm) throws Exception { + allocate(nm.getNodeId().getHost(), memory, numContainers, + new ArrayList()); + List containers = new ArrayList(); + int timeoutSecs = 0; + do { + nm.nodeHeartbeat(true); + containers.addAll(allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + System.out.println("Attempt " + this.attemptId + " is waiting for " + + numContainers + " containers to be allocated. Currently has " + + containers.size() + " containers allocated."); + } while (containers.size() != numContainers && timeoutSecs++ < 60); + return containers; + } + public List createReq(String[] hosts, int memory, int priority, int containers) throws Exception { List reqs = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 1dcac06..f0002dc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -185,4 +185,11 @@ public NodeHeartbeatResponse nodeHeartbeat(Map containers = attempt.getJustFinishedContainers(); + System.out.println("Received completed containers " + containers); + if (containers.contains(completedContainer)) { + break; + } + Thread.sleep(200); + } + } + public void waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState) throws Exception { RMContainer container = getResourceScheduler().getRMContainer(containerId); int timeoutSecs = 0; - while(container == null && timeoutSecs++ < 20) { + while(container == null && timeoutSecs++ < 100) { nm.nodeHeartbeat(true); container = getResourceScheduler().getRMContainer(containerId); System.out.println("Waiting for container " + containerId + " to be allocated."); @@ -542,4 +556,11 @@ public ApplicationReport getApplicationReport(ApplicationId appId) .newInstance(appId)); return response.getApplicationReport(); } + + // Explicitly reset queue metrics for testing. + @SuppressWarnings("static-access") + public void clearQueueMetrics(RMApp app) { + ((AbstractYarnScheduler) getResourceScheduler()).getSchedulerApplications() + .get(app.getApplicationId()).getQueue().getMetrics().clearQueueMetrics(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java new file mode 100644 index 0000000..63bbe21 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -0,0 +1,291 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Before; +import org.junit.Test; + +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestWorkPreservingRMRestart { + + private YarnConfiguration conf; + MockRM rm1 = null; + MockRM rm2 = null; + + protected void setScheduler(YarnConfiguration conf) { + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + } + + @Before + public void setup() throws UnknownHostException { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + setScheduler(conf); + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + DefaultMetricsSystem.setMiniClusterMode(true); + } + + public void tearDown() { + if (rm1 != null) { + rm1.stop(); + } + if (rm2 != null) { + rm2.stop(); + } + } + + // Test scheduler state including SchedulerAttempt, SchedulerNode, + // AppSchedulingInfo and Queue state can be reconstructed via the container + // recovery reports on NM re-registration. + // Test Strategy: send 3 container recovery reports(AMContainer, running + // container, completed container) on NM re-registration, check the states of + // SchedulerAttempt, SchedulerNode etc. are updated accordingly. + @Test + public void testSchedulerStateRecoveryOnRMRestart() throws Exception { + + int containerMemory = 1024; + Resource containerResource = Resource.newInstance(containerMemory, 1); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // clear queue metrics + rm1.clearQueueMetrics(app1); + + // Re-start RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // recover app + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt(); + ContainerStatus amContainer = + createContainerRecoveryReport(am1, 1, ContainerState.RUNNING); + ContainerStatus runningContainer = + createContainerRecoveryReport(am1, 2, ContainerState.RUNNING); + ContainerStatus completedContainer = + createContainerRecoveryReport(am1, 3, ContainerState.COMPLETE); + + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer)); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(2000); + // check RMContainers are re-recreated and the container state is correct. + rm2.waitForState(nm1, amContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForState(nm1, runningContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForContainerToComplete(loadedAttempt1, completedContainer); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId()); + + //********* check scheduler node state.******* + // 2 running containers. + Resource usedResources = Resources.multiply(containerResource, 2); + Resource nmResource = + Resource.newInstance(nm1.getMemory(), nm1.getvCores()); + + assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidContainer(runningContainer + .getContainerId())); + assertFalse(schedulerNode1.isValidContainer(completedContainer + .getContainerId())); + // 2 launched containers, 1 completed container + assertEquals(2, schedulerNode1.getNumContainers()); + + assertEquals(Resources.subtract(nmResource, usedResources), + schedulerNode1.getAvailableResource()); + assertEquals(usedResources, schedulerNode1.getUsedResource()); + + //*********** check queue state. ********** + Map schedulerApps = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication schedulerApp = + schedulerApps.get(recoveredApp1.getApplicationId()); + Queue queue = schedulerApp.getQueue(); + assertQueueMetrics(rm2, queue, nmResource, recoveredApp1, usedResources); + //************* check Queue metrics ************ + QueueMetrics queueMetrics = queue.getMetrics(); + Resource availableResources = Resources.subtract(nmResource, usedResources); + asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResources.getMemory(), + usedResources.getVirtualCores()); + + //************ check user metrics *********** + QueueMetrics userMetrics = + queueMetrics.getUserMetrics(recoveredApp1.getUser()); + asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResources.getMemory(), + usedResources.getVirtualCores()); + + //*********** check scheduler attempt state.******** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertSchedulerAttemptState(amContainer, runningContainer, scheduler, + usedResources, availableResources, schedulerAttempt); + + //*********** check appSchedulingInfo state *********** + assertEquals(4, schedulerAttempt.getNewContainerId()); + } + + protected void assertSchedulerAttemptState(ContainerStatus amContainer, + ContainerStatus runningContainer, + AbstractYarnScheduler scheduler, Resource usedResources, Resource availableResources, SchedulerApplicationAttempt schedulerAttempt) { + assertTrue(schedulerAttempt.getLiveContainers().contains( + scheduler.getRMContainer(amContainer.getContainerId()))); + assertTrue(schedulerAttempt.getLiveContainers().contains( + scheduler.getRMContainer(runningContainer.getContainerId()))); + assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); + } + + // Test RM shuts down, in the meanwhile, AM fails. Restarted RM scheduler + // should not contain the containers that belong to the failed AM. + @Test(timeout = 20000) + public void testAMfailedBetweenRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + ContainerStatus amContainer = + createContainerRecoveryReport(am1, 1, ContainerState.COMPLETE); + ContainerStatus runningContainer = + createContainerRecoveryReport(am1, 2, ContainerState.RUNNING); + ContainerStatus completedContainer = + createContainerRecoveryReport(am1, 3, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer)); + rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + // Wait for RM to settle down on recovering containers; + Thread.sleep(2000); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // Previous AM failed, The failed AM should once again release the + // just-recovered containers. + assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); + assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); + } + + // Apps already completed before RM restart. Restarted RM scheduler should not + // recover containers for completed apps. + @Test(timeout = 20000) + public void testContainersNotRecoveredForCompletedApps() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + ContainerStatus runningContainer = + createContainerRecoveryReport(am1, 2, ContainerState.RUNNING); + ContainerStatus completedContainer = + createContainerRecoveryReport(am1, 3, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(runningContainer, completedContainer)); + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + assertEquals(RMAppState.FINISHED, recoveredApp1.getState()); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(2000); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // scheduler should not recover containers for finished apps. + assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); + assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); + } + + private ContainerStatus createContainerRecoveryReport(MockAM am, int id, + ContainerState containerState) { + ContainerId containerId = + ContainerId.newInstance(am.getApplicationAttemptId(), id); + ContainerStatus containerReport = + ContainerStatus.newInstance(containerId, containerState, + "recover container", 0); + return containerReport; + } + + protected void assertQueueMetrics(MockRM rm2, Queue queue, Resource nmResource, RMApp recoveredApp1, Resource usedResources) { + } + + protected void asserteMetrics(QueueMetrics qm, int appsSubmitted, + int appsPending, int appsRunning, int appsCompleted, + int allocatedContainers, int availableMB, int availableVirtualCores, + int allocatedMB, int allocatedVirtualCores) { + assertEquals(appsSubmitted, qm.getAppsSubmitted()); + assertEquals(appsPending, qm.getAppsPending()); + assertEquals(appsRunning, qm.getAppsRunning()); + assertEquals(appsCompleted, qm.getAppsCompleted()); + assertEquals(allocatedContainers, qm.getAllocatedContainers()); + assertEquals(allocatedMB, qm.getAllocatedMB()); + assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores()); + } +} + diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestartCapacity.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestartCapacity.java new file mode 100644 index 0000000..d974d13 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestartCapacity.java @@ -0,0 +1,75 @@ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .LeafQueue; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import static org.junit.Assert.assertEquals; + +public class TestWorkPreservingRMRestartCapacity extends TestWorkPreservingRMRestart + +{ + @Override + protected void setScheduler(YarnConfiguration conf) { + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + } + + @Override + protected void asserteMetrics(QueueMetrics qm, int appsSubmitted, int + appsPending, int appsRunning, int appsCompleted, int allocatedContainers, int availableMB, int availableVirtualCores, int allocatedMB, int allocatedVirtualCores) { + super.asserteMetrics(qm, appsSubmitted, appsPending, appsRunning, + appsCompleted, allocatedContainers, availableMB, + availableVirtualCores, allocatedMB, allocatedVirtualCores); + + assertEquals(availableMB, qm.getAvailableMB()); + assertEquals(availableVirtualCores, qm.getAvailableVirtualCores()); + } + + @Override + protected void assertQueueMetrics(MockRM rm2, Queue leafQueue, + Resource nmResource, + RMApp recoveredApp1, Resource usedResources) { + LeafQueue queue = (LeafQueue)leafQueue; + assertEquals(usedResources, queue.getUsedResources()); + assertEquals(2, queue.getNumContainers()); + + ResourceCalculator calc = + ((CapacityScheduler) this.rm2.getResourceScheduler()) + .getResourceCalculator(); + float usedCapacity = + Resources.divide(calc, nmResource, usedResources, nmResource); + assertEquals(usedCapacity, queue.getUsedCapacity(), 0); + assertEquals(usedCapacity, queue.getAbsoluteUsedCapacity(), 0); + assertEquals(usedResources, queue.getUser(recoveredApp1.getUser()) + .getConsumedResources()); + } + + @Override + protected void assertSchedulerAttemptState(ContainerStatus amContainer, + ContainerStatus runningContainer, AbstractYarnScheduler scheduler, Resource usedResources, Resource availableResources, SchedulerApplicationAttempt schedulerAttempt) { + super.assertSchedulerAttemptState(amContainer, runningContainer, + scheduler, usedResources, availableResources, schedulerAttempt); + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + } +}