diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 4a534cf..59e108a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -318,6 +318,13 @@ public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; + @Private + public static final String RM_WORK_PRESERVING_RECOVERY_ENABLED = RM_PREFIX + + "work-preserving-recovery.enabled"; + @Private + public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED = + false; + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index c94b782..0c1628e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -270,6 +270,14 @@ + Enable RM work preserving recovery. This configuration is private + to YARN for experimenting the feature. + + yarn.resourcemanager.work-preserving-recovery.enabled + false + + + The class to use as the persistent store. If org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 79fb5df..517e680 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -99,4 +99,6 @@ void setRMApplicationHistoryWriter( RMApplicationHistoryWriter rmApplicationHistoryWriter); ConfigurationProvider getConfigurationProvider(); + + boolean isWorkPreservingRecoveryEnabled(); } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1eb4b75..1abc660 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -60,6 +60,7 @@ = new ConcurrentHashMap(); private boolean isHAEnabled; + private boolean isWorkPreservingRecoveryEnabled; private HAServiceState haServiceState = HAServiceProtocol.HAServiceState.INITIALIZING; @@ -329,6 +330,15 @@ public HAServiceState getHAServiceState() { } } + public void setWorkPreservingRecoveryEnabled(boolean enabled) { + this.isWorkPreservingRecoveryEnabled = enabled; + } + + @Override + public boolean isWorkPreservingRecoveryEnabled() { + return this.isWorkPreservingRecoveryEnabled; + } + @Override public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { return rmApplicationHistoryWriter; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b62bd5f..724dee1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -364,9 +364,15 @@ protected void serviceInit(Configuration configuration) throws Exception { YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); RMStateStore rmStore = null; - if(isRecoveryEnabled) { + if (isRecoveryEnabled) { recoveryEnabled = true; - rmStore = RMStateStoreFactory.getStore(conf); + rmStore = RMStateStoreFactory.getStore(conf); + boolean isWorkPreservingRecoveryEnabled = + conf.getBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); + rmContext + .setWorkPreservingRecoveryEnabled(isWorkPreservingRecoveryEnabled); } else { recoveryEnabled = false; rmStore = new NullRMStateStore(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index a59d1d5..e00eaef 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -243,11 +244,13 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); - if (!request.getNMContainerStatuses().isEmpty()) { - LOG.info("received container statuses on node manager register :" - + request.getNMContainerStatuses()); - for (NMContainerStatus report : request.getNMContainerStatuses()) { - handleNMContainerStatus(report); + if (!rmContext.isWorkPreservingRecoveryEnabled()) { + if (!request.getNMContainerStatuses().isEmpty()) { + LOG.info("received container statuses on node manager register :" + + request.getNMContainerStatuses()); + for (NMContainerStatus status : request.getNMContainerStatuses()) { + handleNMContainerStatus(status); + } } } RegisterNodeManagerResponse response = recordFactory @@ -308,7 +311,7 @@ public RegisterNodeManagerResponse registerNodeManager( RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.STARTED)); + new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses())); } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index f4f2e20..bbd135b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -723,29 +723,36 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } + // synchronously recover attempt to ensure any incoming external events + // to be processed after the attempt processes the recover event. + private void recoverAppAttempts() { + for (RMAppAttempt attempt : getAppAttempts().values()) { + attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(), + RMAppAttemptEventType.RECOVER)); + } + } + private static final class RMAppRecoveredTransition implements MultipleArcTransition { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - for (RMAppAttempt attempt : app.getAppAttempts().values()) { - // synchronously recover attempt to ensure any incoming external events - // to be processed after the attempt processes the recover event. - attempt.handle( - new RMAppAttemptEvent(attempt.getAppAttemptId(), - RMAppAttemptEventType.RECOVER)); - } - // The app has completed. if (app.recoveredFinalState != null) { + app.recoverAppAttempts(); new FinalTransition(app.recoveredFinalState).transition(app, event); return app.recoveredFinalState; } - // Last attempt is in final state, do not add to scheduler and just return - // ACCEPTED waiting for last RMAppAttempt to send finished or failed event - // back. + // Notify scheduler about the app on recovery + new AddApplicationToSchedulerTransition().transition(app, event); + + // recover attempts + app.recoverAppAttempts(); + + // Last attempt is in final state, return ACCEPTED waiting for last + // RMAppAttempt to send finished or failed event back. if (app.currentAttempt != null && (app.currentAttempt.getState() == RMAppAttemptState.KILLED || app.currentAttempt.getState() == RMAppAttemptState.FINISHED @@ -754,9 +761,6 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return RMAppState.ACCEPTED; } - // Notify scheduler about the app on recovery - new AddApplicationToSchedulerTransition().transition(app, event); - // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { @@ -1055,8 +1059,12 @@ public void transition(RMAppImpl app, RMAppEvent event) { if (app.finishTime == 0 ) { app.finishTime = System.currentTimeMillis(); } - app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + // Recovered apps that are completed were not added to scheduler, so no + // need to remove them from scheduler. + if (app.recoveredFinalState == null) { + app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, + finalState)); + } app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index e289ad5..5b1a17d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -267,15 +267,17 @@ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.CONTAINER_FINISHED, new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED)) // Transitions from LAUNCHED State .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING, RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition()) - .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, + .addTransition(RMAppAttemptState.LAUNCHED, + EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new ContainerFinishedTransition( + new AMContainerCrashedBeforeRunningTransition(), + RMAppAttemptState.LAUNCHED)) .addTransition( RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -302,7 +304,9 @@ RMAppAttemptState.RUNNING, EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new ContainerFinishedTransition()) + new ContainerFinishedTransition( + new AMContainerCrashedAtRunningTransition(), + RMAppAttemptState.RUNNING)) .addTransition( RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -904,6 +908,12 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } return appAttempt.recoveredFinalState; } else { + // Add the current attempt to the scheduler. + if (appAttempt.rmContext.isWorkPreservingRecoveryEnabled()) { + appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( + appAttempt.getAppAttemptId(), false)); + } + /* * Since the application attempt's final state is not saved that means * for AM container (previous attempt) state must be one of these. @@ -1207,17 +1217,16 @@ public void transition(RMAppAttemptImpl appAttempt, } } - private static final class AMContainerCrashedTransition extends + private static final class AMContainerCrashedBeforeRunningTransition extends BaseFinalTransition { - public AMContainerCrashedTransition() { + public AMContainerCrashedBeforeRunningTransition() { super(RMAppAttemptState.FAILED); } @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptContainerFinishedEvent finishEvent = ((RMAppAttemptContainerFinishedEvent)event); @@ -1410,6 +1419,16 @@ public void transition(RMAppAttemptImpl appAttempt, implements MultipleArcTransition { + // The transition To Do after attempt final state is saved. + private BaseTransition transitionToDo; + private RMAppAttemptState currentState; + + public ContainerFinishedTransition(BaseTransition transitionToDo, + RMAppAttemptState currentState) { + this.transitionToDo = transitionToDo; + this.currentState = currentState; + } + @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { @@ -1426,14 +1445,13 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, containerStatus.getContainerId())) { // Remember the follow up transition and save the final attempt state. appAttempt.rememberTargetTransitionsAndStoreState(event, - new ContainerFinishedFinalStateSavedTransition(), - RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); + transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); return RMAppAttemptState.FINAL_SAVING; } // Normal container.Put it in completedcontainers list appAttempt.justFinishedContainers.add(containerStatus); - return RMAppAttemptState.RUNNING; + return this.currentState; } } @@ -1451,7 +1469,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } } - private static class ContainerFinishedFinalStateSavedTransition extends + private static class AMContainerCrashedAtRunningTransition extends BaseTransition { @Override public void diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java index ace4435..259d68b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java @@ -33,5 +33,7 @@ RELEASED, // Source: ContainerAllocationExpirer - EXPIRE + EXPIRE, + + RECOVER } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 2921891..01db215 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -35,12 +35,14 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -65,6 +67,9 @@ RMContainerEventType.KILL) .addTransition(RMContainerState.NEW, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) + .addTransition(RMContainerState.NEW, + EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED), + RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, @@ -341,6 +346,38 @@ public void transition(RMContainerImpl cont, RMContainerEvent event) { } } + private static final class ContainerRecoveredTransition + implements + MultipleArcTransition { + @Override + public RMContainerState transition(RMContainerImpl container, + RMContainerEvent event) { + NMContainerStatus report = + ((RMContainerRecoverEvent) event).getContainerReport(); + if (report.getContainerState().equals(ContainerState.COMPLETE)) { + ContainerStatus status = + ContainerStatus.newInstance(report.getContainerId(), + report.getContainerState(), report.getDiagnostics(), + report.getContainerExitStatus()); + + new FinishedTransition().transition(container, + new RMContainerFinishedEvent(container.containerId, status, + RMContainerEventType.FINISHED)); + return RMContainerState.COMPLETED; + } else if (report.getContainerState().equals(ContainerState.RUNNING)) { + // Tell the appAttempt + container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent( + container.getApplicationAttemptId(), container.getContainer())); + return RMContainerState.RUNNING; + } else { + // This can never happen. + LOG.warn("RMContainer received unexpected recover event with container" + + " state " + report.getContainerState() + " while recovering."); + return RMContainerState.RUNNING; + } + } + } + private static final class ContainerReservedTransition extends BaseTransition { @@ -398,7 +435,6 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { // Inform AppAttempt container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( container.appAttemptId, finishedEvent.getRemoteContainerStatus())); - container.rmContext.getRMApplicationHistoryWriter() .containerFinished(container); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java new file mode 100644 index 0000000..338ccbd --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; + +public class RMContainerRecoverEvent extends RMContainerEvent { + + private final NMContainerStatus containerReport; + + public RMContainerRecoverEvent(ContainerId containerId, + NMContainerStatus containerReport) { + super(containerId, RMContainerEventType.RECOVER); + this.containerReport = containerReport; + } + + public NMContainerStatus getContainerReport() { + return containerReport; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 0d33796..66a7d96 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; @@ -460,13 +461,9 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler + RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; + List containers = null; - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - String host = rmNode.nodeId.getHost(); if (rmNode.context.getInactiveRMNodes().containsKey(host)) { // Old node rejoining @@ -476,10 +473,17 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } else { // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); + containers = startEvent.getContainerRecoveryReports(); } + + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(rmNode, containers)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } } - + public static class ReconnectNodeTransition implements SingleArcTransition { @@ -513,7 +517,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED)); + new RMNodeStartedEvent(newNode.getNodeID(), null)); } rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java new file mode 100644 index 0000000..0414347 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; + +public class RMNodeStartedEvent extends RMNodeEvent { + + private List containerReports; + + public RMNodeStartedEvent(NodeId nodeId, List containerReports) { + super(nodeId, RMNodeEventType.STARTED); + this.containerReports = containerReports; + } + + public List getContainerRecoveryReports() { + return this.containerReports; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index cc1cb47..9c878fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -32,14 +32,21 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +@SuppressWarnings("unchecked") public abstract class AbstractYarnScheduler extends AbstractService implements ResourceScheduler { @@ -47,8 +54,7 @@ private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); // Nodes in the cluster, indexed by NodeId - protected Map nodes = - new ConcurrentHashMap(); + protected Map nodes = new ConcurrentHashMap(); // Whole capacity of the cluster protected Resource clusterResource = Resource.newInstance(0, 0); @@ -58,6 +64,7 @@ protected RMContext rmContext; protected Map> applications; + protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); protected static final Allocation EMPTY_ALLOCATION = new Allocation( @@ -169,4 +176,90 @@ public String moveApplication(ApplicationId appId, String newQueue) throw new YarnException(getClass().getSimpleName() + " does not support moving apps between queues"); } + + private void killOrphanContainerOnNode(RMNode node, + NMContainerStatus container) { + if (!container.getContainerState().equals(ContainerState.COMPLETE)) { + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeCleanContainerEvent(node.getNodeID(), + container.getContainerId())); + } + } + + public synchronized void recoverContainersOnNode( + List containerReports, RMNode nm) { + if (!rmContext.isWorkPreservingRecoveryEnabled() + || containerReports == null + || (containerReports != null && containerReports.isEmpty())) { + return; + } + + for (NMContainerStatus container : containerReports) { + ApplicationId appId = + container.getContainerId().getApplicationAttemptId().getApplicationId(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.error("Skip recovering container " + container + + " for unknown application."); + killOrphanContainerOnNode(nm, container); + continue; + } + + // Unmanaged AM recovery is addressed in YARN-1815 + if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) { + LOG.info("Skip recovering container " + container + " for unmanaged AM." + + rmApp.getApplicationId()); + killOrphanContainerOnNode(nm, container); + continue; + } + + SchedulerApplication schedulerApp = applications.get(appId); + if (schedulerApp == null) { + LOG.info("Skip recovering container " + container + + " for unknown SchedulerApplication. Application current state is " + + rmApp.getState()); + killOrphanContainerOnNode(nm, container); + continue; + } + + LOG.info("Recovering container " + container); + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + + // create container + RMContainer rmContainer = recoverAndCreateContainer(container, nm); + + // recover RMContainer + rmContainer.handle(new RMContainerRecoverEvent(container.getContainerId(), + container)); + + // recover scheduler node + nodes.get(nm.getNodeID()).recoverContainer(rmContainer); + + // recover queue: update headroom etc. + Queue queue = schedulerAttempt.getQueue(); + queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); + + // recover scheduler attempt + schedulerAttempt.recoverContainer(rmContainer); + } + } + + private RMContainer recoverAndCreateContainer(NMContainerStatus report, + RMNode node) { + Container container = + Container.newInstance(report.getContainerId(), node.getNodeID(), + node.getHttpAddress(), report.getAllocatedResource(), + report.getPriority(), null); + ApplicationAttemptId attemptId = + container.getId().getApplicationAttemptId(); + RMContainer rmContainer = + new RMContainerImpl(container, attemptId, node.getNodeID(), + applications.get(attemptId.getApplicationId()).getUser(), rmContext); + return rmContainer; + } + + public SchedulerNode getSchedulerNode(NodeId nodeId) { + return nodes.get(nodeId); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index de71f71..d3d03fd 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -409,4 +411,25 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo( // this.requests = appInfo.getRequests(); this.blacklist = appInfo.getBlackList(); } + + public synchronized void recoverContainer(RMContainer rmContainer) { + // ContainerIdCounter on recovery will be addressed in YARN-2052 + this.containerIdCounter.incrementAndGet(); + + QueueMetrics metrics = queue.getMetrics(); + if (pending) { + // If there was any container to recover, the application was + // running from scheduler's POV. + pending = false; + metrics.runAppAttempt(applicationId, user); + } + + // Container is completed. Skip recovering resources. + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + + metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), + false); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index c51f819..0bc8ca1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @Evolving @LimitedPrivate("yarn") @@ -60,4 +62,13 @@ boolean hasAccess(QueueACL acl, UserGroupInformation user); public ActiveUsersManager getActiveUsersManager(); + + /** + * Recover the state of the queue for a given container. + * @param clusterResource the resource of the cluster + * @param schedulerAttempt the application for which the container was allocated + * @param rmContainer the container that was recovered. + */ + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index cce2e46..848db41 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; @@ -523,5 +524,24 @@ public synchronized void move(Queue newQueue) { appSchedulingInfo.move(newQueue); this.queue = newQueue; - } + } + + public synchronized void recoverContainer(RMContainer rmContainer) { + // recover app scheduling info + appSchedulingInfo.recoverContainer(rmContainer); + + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + LOG.info("SchedulerAttempt " + getApplicationAttemptId() + + " is recovering container " + rmContainer.getContainerId()); + liveContainers.put(rmContainer.getContainerId(), rmContainer); + Resources.addTo(currentConsumption, rmContainer.getContainer() + .getResource()); + // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource + // is called. + // newlyAllocatedContainers.add(rmContainer); + // schedulingOpportunities + // lastScheduledContainer + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 85d016b..9fb8d23 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -35,10 +34,10 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.base.Preconditions; /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. @@ -119,13 +118,10 @@ public String getRackName() { * The Scheduler has allocated containers on this node to the given * application. * - * @param applicationId - * application * @param rmContainer * allocated container */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { + public synchronized void allocateContainer(RMContainer rmContainer) { Container container = rmContainer.getContainer(); deductAvailableResource(container.getResource()); ++numContainers; @@ -166,8 +162,8 @@ public Resource getTotalResource() { return this.totalResourceCapability; } - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) { + public synchronized boolean isValidContainer(ContainerId containerId) { + if (launchedContainers.containsKey(containerId)) { return true; } return false; @@ -185,7 +181,7 @@ private synchronized void updateResource(Container container) { * container to be released */ public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { + if (!isValidContainer(container.getId())) { LOG.error("Invalid container released " + container); return; } @@ -274,4 +270,12 @@ public synchronized RMContainer getReservedContainer() { // we can only adjust available resource if total resource is changed. Resources.addTo(this.availableResource, deltaResource); } + + + public synchronized void recoverContainer(RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + allocateContainer(rmContainer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index f5090ba..ccb71e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -28,7 +28,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; @@ -235,15 +234,6 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) public ActiveUsersManager getActiveUsersManager(); /** - * Recover the state of the queue - * @param clusterResource the resource of the cluster - * @param application the application for which the container was allocated - * @param container the container that was recovered. - */ - public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, - Container container); - - /** * Adds all applications in the queue and its subqueues to the given collection. * @param apps the collection to add the applications to */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7767445..5de407d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -872,6 +872,8 @@ public void handle(SchedulerEvent event) { { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); } break; case NODE_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5ddb9a4..65938aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @Private @Unstable public class LeafQueue implements CSQueue { @@ -564,7 +566,8 @@ public String toString() { "numContainers=" + getNumContainers(); } - private synchronized User getUser(String userName) { + @VisibleForTesting + public synchronized User getUser(String userName) { User user = users.get(userName); if (user == null) { user = new User(); @@ -1346,8 +1349,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod } // Inform the node - node.allocateContainer(application.getApplicationId(), - allocatedContainer); + node.allocateContainer(allocatedContainer); LOG.info("assignedContainer" + " application attempt=" + application.getApplicationAttemptId() + @@ -1446,7 +1448,7 @@ public void completedContainer(Resource clusterResource, } synchronized void allocateResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource) { + SchedulerApplicationAttempt application, Resource resource) { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( @@ -1530,7 +1532,8 @@ public QueueMetrics getMetrics() { return metrics; } - static class User { + @VisibleForTesting + public static class User { Resource consumed = Resources.createResource(0, 0); int pendingApplications = 0; int activeApplications = 0; @@ -1580,13 +1583,16 @@ public synchronized void releaseContainer(Resource resource) { @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, application, container.getResource()); + allocateResource(clusterResource, attempt, rmContainer.getContainer() + .getResource()); } - getParent().recoverContainer(clusterResource, application, container); - + getParent().recoverContainer(clusterResource, attempt, rmContainer); } /** @@ -1613,5 +1619,4 @@ public void collectSchedulerApplications( apps.add(app.getApplicationAttemptId()); } } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index dba92a6..d83eed3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -38,7 +38,6 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -770,13 +771,16 @@ public QueueMetrics getMetrics() { @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, container.getResource()); + allocateResource(clusterResource,rmContainer.getContainer().getResource()); } if (parent != null) { - parent.recoverContainer(clusterResource, application, container); + parent.recoverContainer(clusterResource, attempt, rmContainer); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 7bab760..5227aac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java index c487f48..34ca6e4 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java @@ -18,19 +18,34 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +import java.util.List; + +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class NodeAddedSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; + private final List containerReports; public NodeAddedSchedulerEvent(RMNode rmNode) { super(SchedulerEventType.NODE_ADDED); this.rmNode = rmNode; + this.containerReports = null; + } + + public NodeAddedSchedulerEvent(RMNode rmNode, + List containerReports) { + super(SchedulerEventType.NODE_ADDED); + this.rmNode = rmNode; + this.containerReports = containerReports; } public RMNode getAddedRMNode() { return rmNode; } + public List getContainerReports() { + return containerReports; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 32edc8a..b22d3d3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -264,8 +264,7 @@ private Resource assignContainer(FSSchedulerNode node, } // Inform the node - node.allocateContainer(app.getApplicationId(), - allocatedContainer); + node.allocateContainer(allocatedContainer); // If this container is used to run AM, update the leaf queue's AM usage if (app.getLiveContainers().size() == 1 && diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index cecfbfc..75a170b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; @Private @@ -314,4 +315,10 @@ public void addAMResourceUsage(Resource amResource) { Resources.addTo(amResourceUsage, amResource); } } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 48db414..5ab60af 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -35,7 +35,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @Private @Unstable @@ -228,4 +230,11 @@ public ActiveUsersManager getActiveUsersManager() { // Should never be called since all applications are submitted to LeafQueues return null; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index d461615..4681516 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -76,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -178,6 +179,17 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + increaseUsedResources(rmContainer); + updateAppHeadRoom(schedulerAttempt); + updateAvailableResourcesMetrics(); + } }; public FifoScheduler() { @@ -488,7 +500,7 @@ private void assignContainers(FiCaSchedulerNode node) { if (attempt == null) { continue; } - attempt.setHeadroom(Resources.subtract(clusterResource, usedResource)); + updateAppHeadRoom(attempt); } } @@ -659,11 +671,10 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application application.allocate(type, node, priority, request, container); // Inform the node - node.allocateContainer(application.getApplicationId(), - rmContainer); + node.allocateContainer(rmContainer); // Update usage for this container - Resources.addTo(usedResource, capability); + increaseUsedResources(rmContainer); } } @@ -707,9 +718,22 @@ private synchronized void nodeUpdate(RMNode rmNode) { LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " + node.getAvailableResource()); } - - metrics.setAvailableResourcesToQueue( - Resources.subtract(clusterResource, usedResource)); + + updateAvailableResourcesMetrics(); + } + + private void increaseUsedResources(RMContainer rmContainer) { + Resources.addTo(usedResource, rmContainer.getAllocatedResource()); + } + + private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) { + schedulerAttempt.setHeadroom(Resources.subtract(clusterResource, + usedResource)); + } + + private void updateAvailableResourcesMetrics() { + metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource, + usedResource)); } @Override @@ -719,6 +743,9 @@ public void handle(SchedulerEvent event) { { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + } break; case NODE_REMOVED: @@ -923,4 +950,8 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, return null; } } + + public Resource getUsedResource() { + return usedResource; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index caee228..446bbae 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -21,10 +21,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; +import java.util.List; import java.util.Map; -import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; @@ -47,12 +46,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; @@ -69,6 +70,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -76,6 +81,7 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; @SuppressWarnings("unchecked") public class MockRM extends ResourceManager { @@ -144,11 +150,26 @@ public void waitForContainerAllocated(MockNM nm, ContainerId containerId) } } + public void waitForContainerToComplete(RMAppAttempt attempt, + NMContainerStatus completedContainer) throws InterruptedException { + while (true) { + List containers = attempt.getJustFinishedContainers(); + System.out.println("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(200); + } + } + public void waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState) throws Exception { RMContainer container = getResourceScheduler().getRMContainer(containerId); int timeoutSecs = 0; - while(container == null && timeoutSecs++ < 20) { + while(container == null && timeoutSecs++ < 100) { nm.nodeHeartbeat(true); container = getResourceScheduler().getRMContainer(containerId); System.out.println("Waiting for container " + containerId + " to be allocated."); @@ -333,7 +354,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(nm.getNodeId(), null)); } public void sendNodeLost(MockNM nm) throws Exception { @@ -542,4 +563,12 @@ public ApplicationReport getApplicationReport(ApplicationId appId) .newInstance(appId)); return response.getApplicationReport(); } + + // Explicitly reset queue metrics for testing. + @SuppressWarnings("static-access") + public void clearQueueMetrics(RMApp app) { + ((AbstractYarnScheduler) getResourceScheduler()) + .getSchedulerApplications().get(app.getApplicationId()).getQueue() + .getMetrics().clearQueueMetrics(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index f50ae9d..b7b77a7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -18,16 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -56,11 +57,10 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; public class TestFifoScheduler { private static final Log LOG = LogFactory.getLog(TestFifoScheduler.class); @@ -298,7 +298,10 @@ public void testReconnectedNode() throws Exception { FifoScheduler fs = new FifoScheduler(); fs.init(conf); fs.start(); + // mock rmContext to avoid NPE. + RMContext context = mock(RMContext.class); fs.reinitialize(conf, null); + fs.setRMContext(context); RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, "127.0.0.2"); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java index b80a6bc..36153de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java @@ -43,10 +43,11 @@ public class TestMoveApplication { private ResourceManager resourceManager = null; private static boolean failMove; - + private Configuration conf; + @Before public void setUp() throws Exception { - Configuration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class, FifoSchedulerWithMove.class); conf.set(YarnConfiguration.YARN_ADMIN_ACL, " "); @@ -119,28 +120,23 @@ public void testMoveTooLate() throws Exception { } } - @Test (timeout = 5000) - public void testMoveSuccessful() throws Exception { - // Submit application - Application application = new Application("user1", resourceManager); - ApplicationId appId = application.getApplicationId(); - application.submit(); - - // Wait for app to be accepted - RMApp app = resourceManager.rmContext.getRMApps().get(appId); - while (app.getState() != RMAppState.ACCEPTED) { - Thread.sleep(100); - } - - ClientRMService clientRMService = resourceManager.getClientRMService(); + @Test (timeout = 10000) + public + void testMoveSuccessful() throws Exception { + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app = rm1.submitApp(1024); + ClientRMService clientRMService = rm1.getClientRMService(); // FIFO scheduler does not support moves - clientRMService.moveApplicationAcrossQueues( - MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue")); - - RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId); + clientRMService + .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest + .newInstance(app.getApplicationId(), "newqueue")); + + RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId()); assertEquals("newqueue", rmApp.getQueue()); + rm1.stop(); } - + @Test public void testMoveRejectedByPermissions() throws Exception { failMove = true; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 3bcde8d..1da03fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -21,15 +21,14 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -160,7 +161,7 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent() { @Test (timeout = 5000) public void testExpiredContainer() { // Start the node - node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(null, null)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -188,11 +189,11 @@ public void testExpiredContainer() { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(null, null)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node2.handle(new RMNodeStartedEvent(null, null)); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -248,7 +249,7 @@ public void testContainerUpdate() throws InterruptedException{ @Test (timeout = 5000) public void testStatusChange(){ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(null, null)); //Add info to the queue first node.setNextHeartBeat(false); @@ -464,7 +465,7 @@ private RMNodeImpl getRunningNode(String nmVersion) { RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -495,7 +496,7 @@ public void testAdd() { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(node.getNodeID(), null)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java new file mode 100644 index 0000000..6693d09 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -0,0 +1,570 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@SuppressWarnings({"rawtypes", "unchecked"}) +@RunWith(value = Parameterized.class) +public class TestWorkPreservingRMRestart { + + private YarnConfiguration conf; + private Class schedulerClass; + MockRM rm1 = null; + MockRM rm2 = null; + + @Before + public void setup() throws UnknownHostException { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass, + ResourceScheduler.class); + conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + DefaultMetricsSystem.setMiniClusterMode(true); + } + + @After + public void tearDown() { + if (rm1 != null) { + rm1.stop(); + } + if (rm2 != null) { + rm2.stop(); + } + } + + @Parameterized.Parameters + public static Collection getTestParameters() { + return Arrays.asList(new Object[][] { { CapacityScheduler.class }, + { FifoScheduler.class } }); + } + + public TestWorkPreservingRMRestart(Class schedulerClass) { + this.schedulerClass = schedulerClass; + } + + // Test common scheduler state including SchedulerAttempt, SchedulerNode, + // AppSchedulingInfo can be reconstructed via the container recovery reports + // on NM re-registration. + // Also test scheduler specific changes: i.e. Queue recovery- + // CSQueue/FSQueue/FifoQueue recovery respectively. + // Test Strategy: send 3 container recovery reports(AMContainer, running + // container, completed container) on NM re-registration, check the states of + // SchedulerAttempt, SchedulerNode etc. are updated accordingly. + @Test(timeout = 20000) + public void testSchedulerRecovery() throws Exception { + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + + int containerMemory = 1024; + Resource containerResource = Resource.newInstance(containerMemory, 1); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // clear queue metrics + rm1.clearQueueMetrics(app1); + + // Re-start RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // recover app + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt(); + NMContainerStatus amContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1, + ContainerState.RUNNING); + NMContainerStatus runningContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + NMContainerStatus completedContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer)); + + // Wait for RM to settle down on recovering containers; + waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); + + // check RMContainers are re-recreated and the container state is correct. + rm2.waitForState(nm1, amContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForState(nm1, runningContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForContainerToComplete(loadedAttempt1, completedContainer); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId()); + + // ********* check scheduler node state.******* + // 2 running containers. + Resource usedResources = Resources.multiply(containerResource, 2); + Resource nmResource = + Resource.newInstance(nm1.getMemory(), nm1.getvCores()); + + assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidContainer(runningContainer + .getContainerId())); + assertFalse(schedulerNode1.isValidContainer(completedContainer + .getContainerId())); + // 2 launched containers, 1 completed container + assertEquals(2, schedulerNode1.getNumContainers()); + + assertEquals(Resources.subtract(nmResource, usedResources), + schedulerNode1.getAvailableResource()); + assertEquals(usedResources, schedulerNode1.getUsedResource()); + Resource availableResources = Resources.subtract(nmResource, usedResources); + + // ***** check queue state based on the underlying scheduler ******** + Map schedulerApps = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication schedulerApp = + schedulerApps.get(recoveredApp1.getApplicationId()); + + if (schedulerClass.equals(CapacityScheduler.class)) { + checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2); + } else if (schedulerClass.equals(FifoScheduler.class)) { + checkFifoQueue(schedulerApp, usedResources, availableResources); + } + + // *********** check scheduler attempt state.******** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertTrue(schedulerAttempt.getLiveContainers().contains( + scheduler.getRMContainer(amContainer.getContainerId()))); + assertTrue(schedulerAttempt.getLiveContainers().contains( + scheduler.getRMContainer(runningContainer.getContainerId()))); + assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + + // *********** check appSchedulingInfo state *********** + assertEquals(4, schedulerAttempt.getNewContainerId()); + } + + private void checkCSQueue(MockRM rm, + SchedulerApplication app, + Resource clusterResource, Resource queueResource, Resource usedResource, + int numContainers) + throws Exception { + checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource, + numContainers); + + LeafQueue queue = (LeafQueue) app.getQueue(); + Resource availableResources = Resources.subtract(queueResource, usedResource); + // ************* check Queue metrics ************ + QueueMetrics queueMetrics = queue.getMetrics(); + asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResource.getMemory(), + usedResource.getVirtualCores()); + + // ************ check user metrics *********** + QueueMetrics userMetrics = + queueMetrics.getUserMetrics(app.getUser()); + asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResource.getMemory(), + usedResource.getVirtualCores()); + } + + private void checkCSLeafQueue(MockRM rm, + SchedulerApplication app, + Resource clusterResource, Resource queueResource, Resource usedResource, + int numContainers) { + LeafQueue leafQueue = (LeafQueue) app.getQueue(); + // assert queue used resources. + assertEquals(usedResource, leafQueue.getUsedResources()); + assertEquals(numContainers, leafQueue.getNumContainers()); + + ResourceCalculator calc = + ((CapacityScheduler) rm.getResourceScheduler()).getResourceCalculator(); + float usedCapacity = + Resources.divide(calc, clusterResource, usedResource, queueResource); + // assert queue used capacity + assertEquals(usedCapacity, leafQueue.getUsedCapacity(), 1e-8); + float absoluteUsedCapacity = + Resources.divide(calc, clusterResource, usedResource, clusterResource); + // assert queue absolute capacity + assertEquals(absoluteUsedCapacity, leafQueue.getAbsoluteUsedCapacity(), + 1e-8); + // assert user consumed resources. + assertEquals(usedResource, leafQueue.getUser(app.getUser()) + .getConsumedResources()); + } + + private void checkFifoQueue(SchedulerApplication schedulerApp, + Resource usedResources, Resource availableResources) throws Exception { + FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler(); + // ************ check cluster used Resources ******** + assertEquals(usedResources, scheduler.getUsedResource()); + + // ************ check app headroom **************** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + + // ************ check queue metrics **************** + QueueMetrics queueMetrics = scheduler.getRootQueueMetrics(); + asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResources.getMemory(), + usedResources.getVirtualCores()); + } + + // create 3 container reports for AM + public static List + createNMContainerStatusForApp(MockAM am) { + List list = + new ArrayList(); + NMContainerStatus amContainer = + TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 1, + ContainerState.RUNNING); + NMContainerStatus runningContainer = + TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + NMContainerStatus completedContainer = + TestRMRestart.createNMContainerStatus(am.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + list.add(amContainer); + list.add(runningContainer); + list.add(completedContainer); + return list; + } + + private static final String R = "Default"; + private static final String A = "QueueA"; + private static final String B = "QueueB"; + private static final String USER_1 = "user1"; + private static final String USER_2 = "user2"; + + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R }); + final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R; + conf.setCapacity(Q_R, 100); + final String Q_A = Q_R + "." + A; + final String Q_B = Q_R + "." + B; + conf.setQueues(Q_R, new String[] {A, B}); + conf.setCapacity(Q_A, 50); + conf.setCapacity(Q_B, 50); + conf.setDouble(CapacitySchedulerConfiguration + .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f); + } + + // Test CS recovery with multi-level queues and multi-users: + // 1. setup 2 NMs each with 8GB memory; + // 2. setup 2 level queues: Default -> (QueueA, QueueB) + // 3. User1 submits 2 apps on QueueA + // 4. User2 submits 1 app on QueueB + // 5. AM and each container has 1GB memory + // 6. Restart RM. + // 7. nm1 re-syncs back containers belong to user1 + // 8. nm2 re-syncs back containers belong to user2. + // 9. Assert the parent queue and 2 leaf queues state and the metrics. + // 10. Assert each user's consumption inside the queue. + @Test (timeout = 30000) + public void testCapacitySchedulerRecovery() throws Exception { + if (!schedulerClass.equals(CapacityScheduler.class)) { + return; + } + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfiguration(csConf); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(csConf); + rm1 = new MockRM(csConf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm2 = + new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); + RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A); + MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1); + RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A); + MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2); + + RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // clear queue metrics + rm1.clearQueueMetrics(app1_1); + rm1.clearQueueMetrics(app1_2); + rm1.clearQueueMetrics(app2); + + // Re-start RM + rm2 = new MockRM(csConf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm2.setResourceTrackerService(rm2.getResourceTrackerService()); + + List am1_1Containers = + createNMContainerStatusForApp(am1_1); + List am1_2Containers = + createNMContainerStatusForApp(am1_2); + am1_1Containers.addAll(am1_2Containers); + nm1.registerNode(am1_1Containers); + + List am2Containers = + createNMContainerStatusForApp(am2); + nm2.registerNode(am2Containers); + + // Wait for RM to settle down on recovering containers; + waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId()); + waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId()); + waitForNumContainersToRecover(2, rm2, am1_2.getApplicationAttemptId()); + + // Calculate each queue's resource usage. + Resource containerResource = Resource.newInstance(1024, 1); + Resource nmResource = + Resource.newInstance(nm1.getMemory(), nm1.getvCores()); + Resource clusterResource = Resources.multiply(nmResource, 2); + Resource q1Resource = Resources.multiply(clusterResource, 0.5); + Resource q2Resource = Resources.multiply(clusterResource, 0.5); + Resource q1UsedResource = Resources.multiply(containerResource, 4); + Resource q2UsedResource = Resources.multiply(containerResource, 2); + Resource totalUsedResource = Resources.add(q1UsedResource, q2UsedResource); + Resource q1availableResources = + Resources.subtract(q1Resource, q1UsedResource); + Resource q2availableResources = + Resources.subtract(q2Resource, q2UsedResource); + Resource totalAvailableResource = + Resources.add(q1availableResources, q2availableResources); + + Map schedulerApps = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication schedulerApp1_1 = + schedulerApps.get(app1_1.getApplicationId()); + + // assert queue A state. + checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource, + q1UsedResource, 4); + QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics(); + asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4, + q1availableResources.getMemory(), q1availableResources.getVirtualCores(), + q1UsedResource.getMemory(), q1UsedResource.getVirtualCores()); + + // assert queue B state. + SchedulerApplication schedulerApp2 = + schedulerApps.get(app2.getApplicationId()); + checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource, + q2UsedResource, 2); + QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics(); + asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2, + q2availableResources.getMemory(), q2availableResources.getVirtualCores(), + q2UsedResource.getMemory(), q2UsedResource.getVirtualCores()); + + // assert parent queue state. + LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue(); + ParentQueue parentQueue = (ParentQueue) leafQueue.getParent(); + checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16, + (float) 6 / 16); + asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6, + totalAvailableResource.getMemory(), + totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(), + totalUsedResource.getVirtualCores()); + } + + private void checkParentQueue(ParentQueue parentQueue, int numContainers, + Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) { + assertEquals(numContainers, parentQueue.getNumContainers()); + assertEquals(usedResource, parentQueue.getUsedResources()); + assertEquals(UsedCapacity, parentQueue.getUsedCapacity(), 1e-8); + assertEquals(absoluteUsedCapacity, parentQueue.getAbsoluteUsedCapacity(), 1e-8); + } + + // Test RM shuts down, in the meanwhile, AM fails. Restarted RM scheduler + // should not recover the containers that belong to the failed AM. + @Test(timeout = 20000) + public void testAMfailedBetweenRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + NMContainerStatus amContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1, + ContainerState.COMPLETE); + NMContainerStatus runningContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + NMContainerStatus completedContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer)); + rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + // Wait for RM to settle down on recovering containers; + Thread.sleep(3000); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // Previous AM failed, The failed AM should once again release the + // just-recovered containers. + assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); + assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); + } + + // Apps already completed before RM restart. Restarted RM scheduler should not + // recover containers for completed apps. + @Test(timeout = 20000) + public void testContainersNotRecoveredForCompletedApps() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus runningContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + NMContainerStatus completedContainer = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(runningContainer, completedContainer)); + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + assertEquals(RMAppState.FINISHED, recoveredApp1.getState()); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(3000); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + + // scheduler should not recover containers for finished apps. + assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); + assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); + } + + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, + int appsPending, int appsRunning, int appsCompleted, + int allocatedContainers, int availableMB, int availableVirtualCores, + int allocatedMB, int allocatedVirtualCores) { + assertEquals(appsSubmitted, qm.getAppsSubmitted()); + assertEquals(appsPending, qm.getAppsPending()); + assertEquals(appsRunning, qm.getAppsRunning()); + assertEquals(appsCompleted, qm.getAppsCompleted()); + assertEquals(allocatedContainers, qm.getAllocatedContainers()); + assertEquals(availableMB, qm.getAvailableMB()); + assertEquals(availableVirtualCores, qm.getAvailableVirtualCores()); + assertEquals(allocatedMB, qm.getAllocatedMB()); + assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores()); + } + + private void waitForNumContainersToRecover(int num, MockRM rm, + ApplicationAttemptId attemptId) throws Exception { + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + SchedulerApplicationAttempt attempt = + scheduler.getApplicationAttempt(attemptId); + while (attempt == null) { + System.out.println("Wait for scheduler attempt " + attemptId + + " to be created"); + Thread.sleep(200); + attempt = scheduler.getApplicationAttempt(attemptId); + } + while (attempt.getLiveContainers().size() < num) { + System.out.println("Wait for " + num + " containers to recover."); + Thread.sleep(200); + } + } +}