diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerStatus.java
index 574373c..93c6646 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerStatus.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerStatus.java
@@ -115,4 +115,17 @@ public static ContainerStatus newInstance(ContainerId containerId,
@Private
@Unstable
public abstract void setDiagnostics(String diagnostics);
+
+ /**
+ * Get the allocated Resource of the container.
+ *
+ * @return allocated Resource of the container.
+ */
+ @Public
+ @Unstable
+ public abstract Resource getAllocatedResource();
+
+ @Public
+ @Unstable
+ public abstract void setAllocatedResource(Resource resource);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0a11948..6298e3a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -318,6 +318,12 @@
public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false;
+ public static final String RM_WORK_PRESERVING_RECOVERY_ENABLED = RM_PREFIX
+ + "work-preserving.recovery.enabled";
+ public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED =
+ false;
+
+
/** Zookeeper interaction configs */
public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-";
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 1d40320..b5d26ab 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -32,7 +32,6 @@
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -60,6 +59,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -240,13 +240,6 @@ public RegisterNodeManagerResponse registerNodeManager(
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
- if (!request.getContainerStatuses().isEmpty()) {
- LOG.info("received container statuses on node manager register :"
- + request.getContainerStatuses());
- for (ContainerStatus containerStatus : request.getContainerStatuses()) {
- handleContainerStatus(containerStatus);
- }
- }
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
@@ -305,7 +298,7 @@ public RegisterNodeManagerResponse registerNodeManager(
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+ new RMNodeStartEvent(nodeId, request.getContainerStatuses()));
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index f4f2e20..a66c486 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -70,6 +70,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -763,6 +764,11 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
return RMAppState.SUBMITTED;
}
+ // Directly add the current attempt to the scheduler for work-preserving
+ // RM restart.
+ app.handler.handle(new AppAttemptAddedSchedulerEvent(app.currentAttempt
+ .getAppAttemptId(), false));
+
// YARN-1507 is saving the application state after the application is
// accepted. So after YARN-1507, an app is saved meaning it is accepted.
// Thus we return ACCECPTED state on recovery.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
index ace4435..259d68b3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
@@ -33,5 +33,7 @@
RELEASED,
// Source: ContainerAllocationExpirer
- EXPIRE
+ EXPIRE,
+
+ RECOVER
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 2921891..a3a2096 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -65,6 +66,9 @@
RMContainerEventType.KILL)
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
+ .addTransition(RMContainerState.NEW,
+ EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
+ RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
// Transitions from RESERVED state
.addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
@@ -341,6 +345,29 @@ public void transition(RMContainerImpl cont, RMContainerEvent event) {
}
}
+ private static final class ContainerRecoveredTransition
+ implements
+ MultipleArcTransition {
+ @Override
+ public RMContainerState transition(RMContainerImpl container,
+ RMContainerEvent event) {
+ ContainerStatus containerStatus = ((RMContainerRecoverEvent) event).getContainerStatus();
+ if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
+ new FinishedTransition().transition(container,
+ new RMContainerFinishedEvent(container.containerId, containerStatus,
+ RMContainerEventType.FINISHED));
+ return RMContainerState.COMPLETED;
+ } else if (containerStatus.getState()
+ .equals(ContainerState.RUNNING)) {
+ return RMContainerState.RUNNING;
+ } else {
+ LOG.warn("RMContainer received unexpected recover event with container"
+ + " state " + containerStatus.getState() + " while recovering.");
+ return RMContainerState.RUNNING;
+ }
+ }
+ }
+
private static final class ContainerReservedTransition extends
BaseTransition {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java
new file mode 100644
index 0000000..f90723d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+
+public class RMContainerRecoverEvent extends RMContainerEvent {
+
+ private final ContainerStatus containerStatus;
+
+ public RMContainerRecoverEvent(ContainerId containerId,
+ ContainerStatus containerStatus) {
+ super(containerId, RMContainerEventType.RECOVER);
+ this.containerStatus = containerStatus;
+ }
+
+ public ContainerStatus getContainerStatus() {
+ return containerStatus;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index dc53a5d..453a61f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -21,6 +21,8 @@
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -37,7 +39,9 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -54,6 +58,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -460,12 +467,8 @@ private void updateMetricsForDeactivatedNode(NodeState initialState,
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
-
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeAddedSchedulerEvent(rmNode));
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodesListManagerEvent(
- NodesListManagerEventType.NODE_USABLE, rmNode));
+ RMNodeStartEvent startEvent = (RMNodeStartEvent) event;
+ List containerStatuses = null;
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
@@ -476,10 +479,75 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
+ containerStatuses =
+ rmNode.handleContainerStatus(startEvent.getContainerStatuses());
}
+
+ rmNode.context.getDispatcher().getEventHandler()
+ .handle(new NodeAddedSchedulerEvent(rmNode, containerStatuses));
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_USABLE, rmNode));
}
}
-
+
+ private List handleContainerStatus(
+ List containerStatuses) {
+ Set finishedApps = new HashSet();
+ for (Iterator i = containerStatuses.iterator(); i
+ .hasNext();) {
+ ContainerStatus containerStatus = i.next();
+ ApplicationAttemptId appAttemptId =
+ containerStatus.getContainerId().getApplicationAttemptId();
+ RMApp rmApp = context.getRMApps().get(appAttemptId.getApplicationId());
+ if (rmApp == null) {
+ LOG.error("Received finished container : "
+ + containerStatus.getContainerId() + "for unknown application "
+ + appAttemptId.getApplicationId() + " Skipping.");
+ i.remove();
+ continue;
+ }
+
+ if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring container completion status for unmanaged AM"
+ + rmApp.getApplicationId());
+ }
+ i.remove();
+ continue;
+ }
+
+ Container masterContainer =
+ rmApp.getRMAppAttempt(appAttemptId).getMasterContainer();
+ if (masterContainer.getId().equals(containerStatus.getContainerId())
+ && containerStatus.getState() == ContainerState.COMPLETE) {
+ // Sending master container FinishedEvent to indicate RMAppAttemptImpl
+ // that the attempt has completed.
+ RMAppAttemptContainerFinishedEvent evt =
+ new RMAppAttemptContainerFinishedEvent(appAttemptId,
+ containerStatus);
+ context.getDispatcher().getEventHandler().handle(evt);
+ finishedApps.add(appAttemptId.getApplicationId());
+ }
+ // Do not recover containers for already finished apps.
+ if (finishedApps.contains(appAttemptId.getApplicationId())) {
+ i.remove();
+ }
+ }
+
+ // Do not recover containers for already finished apps.
+ for (Iterator i = containerStatuses.iterator(); i
+ .hasNext();) {
+ ApplicationId appId =
+ i.next().getContainerId().getApplicationAttemptId()
+ .getApplicationId();
+ if (finishedApps.contains(appId)) {
+ i.remove();
+ }
+ }
+ return containerStatuses;
+ }
+
public static class ReconnectNodeTransition implements
SingleArcTransition {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java
new file mode 100644
index 0000000..379666d
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class RMNodeStartEvent extends RMNodeEvent {
+
+ private List containerStatuses;
+
+ public RMNodeStartEvent(NodeId nodeId, List containerStatuses) {
+ super(nodeId, RMNodeEventType.STARTED);
+ this.containerStatuses = containerStatuses;
+ }
+
+ public List getContainerStatuses() {
+ return this.containerStatuses;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 0f3af41..fe7bb32 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -22,21 +22,35 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
public abstract class AbstractYarnScheduler implements ResourceScheduler {
protected RMContext rmContext;
protected Map applications;
+ protected Map nodes =
+ new ConcurrentHashMap();
+
+ // Aggregate capacity of the cluster
+ protected Resource clusterResource = Resource.newInstance(0, 0);
+
protected final static List EMPTY_CONTAINER_LIST =
new ArrayList();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
@@ -74,4 +88,50 @@ public String moveApplication(ApplicationId appId, String newQueue)
throw new YarnException(getClass().getSimpleName()
+ " does not support moving apps between queues");
}
+
+ public void recoverContainersOnNode(List containerStatuses,
+ RMNode nm) {
+ if (containerStatuses == null
+ || (containerStatuses != null && containerStatuses.isEmpty())) {
+ return;
+ }
+
+ for (ContainerStatus status : containerStatuses) {
+ // create container
+ RMContainer rmContainer = createContainer(status, nm);
+
+ // recover RMContainer
+ rmContainer.handle(new RMContainerRecoverEvent(status.getContainerId(),
+ status));
+
+ // recover scheduler node
+ nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+
+ SchedulerApplicationAttempt schedulerAttempt =
+ applications.get(
+ rmContainer.getApplicationAttemptId().getApplicationId())
+ .getCurrentAppAttempt();
+
+ // recover queue: update headroom etc.
+ Queue queue = schedulerAttempt.getQueue();
+ queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
+
+ // recover scheduler attempt
+ schedulerAttempt.recoverContainer(rmContainer);
+ }
+ }
+
+ public RMContainer createContainer(ContainerStatus status, RMNode node) {
+ Container container =
+ Container.newInstance(status.getContainerId(), node.getNodeID(),
+ node.getHttpAddress(), status.getAllocatedResource(),
+ Priority.newInstance(0), null);
+ ApplicationAttemptId attemptId =
+ container.getId().getApplicationAttemptId();
+ RMContainer rmContainer =
+ new RMContainerImpl(container, attemptId, node.getNodeID(),
+ applications.get(attemptId.getApplicationId()).getUser(), rmContext);
+ return rmContainer;
+ }
+
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index de71f71..05cf19f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -39,6 +39,8 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -409,4 +411,19 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo(
// this.requests = appInfo.getRequests();
this.blacklist = appInfo.getBlackList();
}
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ containerIdCounter.incrementAndGet();
+ QueueMetrics metrics = queue.getMetrics();
+ if (pending) {
+ // If there was any running containers, the application was
+ // running from scheduler's POV.
+ pending = false;
+ metrics.runAppAttempt(applicationId, user);
+ }
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ metrics.allocateResources(user, 1, Resource.newInstance(1024, 0), false);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
index c51f819..5f374f0 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
@@ -26,6 +26,8 @@
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@Evolving
@LimitedPrivate("yarn")
@@ -60,4 +62,7 @@
boolean hasAccess(QueueACL acl, UserGroupInformation user);
public ActiveUsersManager getActiveUsersManager();
+
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index fc7e047..968ef18 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -499,5 +500,24 @@ public synchronized void move(Queue newQueue) {
appSchedulingInfo.move(newQueue);
this.queue = newQueue;
- }
+ }
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ + " is recovering container " + rmContainer.getContainerId());
+ liveContainers.put(rmContainer.getContainerId(), rmContainer);
+ Resources.addTo(currentConsumption, rmContainer.getContainer()
+ .getResource());
+// resourceLimit: updated when schedulerNode.recoverContainer is called.
+
+// newlyAllocatedContainers.add(rmContainer);
+// schedulingOpportunities
+// lastScheduledContainer
+
+
+ appSchedulingInfo.recoverContainer(rmContainer);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 524b1ab..765b5ca 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -86,4 +87,5 @@
*/
public abstract NodeId getNodeID();
+ public abstract void recoverContainer(RMContainer rmContainer);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index f5090ba..cf237fc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -240,8 +240,8 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
* @param application the application for which the container was allocated
* @param container the container that was recovered.
*/
- public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
- Container container);
+// public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
+// Container container);
/**
* Adds all applications in the queue and its subqueues to the given collection.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e28c18c..5657e19 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -77,6 +77,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -182,11 +183,6 @@ public Configuration getConf() {
private Map queues = new ConcurrentHashMap();
- private Map nodes =
- new ConcurrentHashMap();
-
- private Resource clusterResource =
- RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
private int numNodeManagers = 0;
private Resource minimumAllocation;
@@ -331,16 +327,16 @@ long getAsyncScheduleInterval() {
static void schedule(CapacityScheduler cs) {
// First randomize the start point
int current = 0;
- Collection nodes = cs.getAllNodes().values();
+ Collection nodes = cs.getAllNodes().values();
int start = random.nextInt(nodes.size());
- for (FiCaSchedulerNode node : nodes) {
+ for (SchedulerNode node : nodes) {
if (current++ >= start) {
- cs.allocateContainersToNode(node);
+ cs.allocateContainersToNode((FiCaSchedulerNode)node);
}
}
// Now, just get everyone to be safe
- for (FiCaSchedulerNode node : nodes) {
- cs.allocateContainersToNode(node);
+ for (SchedulerNode node : nodes) {
+ cs.allocateContainersToNode((FiCaSchedulerNode)node);
}
try {
Thread.sleep(cs.getAsyncScheduleInterval());
@@ -861,6 +857,10 @@ public void handle(SchedulerEvent event) {
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
+
+ // recover containers according to received ContainerStatus on NM register.
+ recoverContainersOnNode(nodeAddedEvent.getContainerStatuses(),
+ nodeAddedEvent.getAddedRMNode());
}
break;
case NODE_REMOVED:
@@ -942,7 +942,7 @@ private synchronized void addNode(RMNode nodeManager) {
}
private synchronized void removeNode(RMNode nodeInfo) {
- FiCaSchedulerNode node = this.nodes.get(nodeInfo.getNodeID());
+ FiCaSchedulerNode node = (FiCaSchedulerNode)this.nodes.get(nodeInfo.getNodeID());
if (node == null) {
return;
}
@@ -1041,11 +1041,11 @@ public ApplicationResourceUsageReport getAppResourceUsageReport(
@Lock(Lock.NoLock.class)
FiCaSchedulerNode getNode(NodeId nodeId) {
- return nodes.get(nodeId);
+ return (FiCaSchedulerNode)nodes.get(nodeId);
}
@Lock(Lock.NoLock.class)
- Map getAllNodes() {
+ Map getAllNodes() {
return nodes;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 971edb8..7379d59 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -59,9 +59,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@@ -1446,7 +1446,7 @@ public void completedContainer(Resource clusterResource,
}
synchronized void allocateResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource) {
+ SchedulerApplicationAttempt application, Resource resource) {
// Update queue metrics
Resources.addTo(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
@@ -1580,13 +1580,17 @@ public synchronized void releaseContainer(Resource resource) {
@Override
public void recoverContainer(Resource clusterResource,
- FiCaSchedulerApp application, Container container) {
+ SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource, application, container.getResource());
+ computeUserLimitAndSetHeadroom((FiCaSchedulerApp) attempt,
+ clusterResource, rmContainer.getContainer().getResource());
+ allocateResource(clusterResource, attempt, rmContainer.getContainer().getResource());
}
- getParent().recoverContainer(clusterResource, application, container);
-
+ getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
/**
@@ -1613,5 +1617,4 @@ public void collectSchedulerApplications(
apps.add(app.getApplicationAttemptId());
}
}
-
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 1f09475..19f550c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -38,7 +38,6 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -49,9 +48,11 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -770,13 +771,16 @@ public QueueMetrics getMetrics() {
@Override
public void recoverContainer(Resource clusterResource,
- FiCaSchedulerApp application, Container container) {
+ SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource, container.getResource());
+ allocateResource(clusterResource,rmContainer.getContainer().getResource());
}
if (parent != null) {
- parent.recoverContainer(clusterResource, application, container);
+ parent.recoverContainer(clusterResource, attempt, rmContainer);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 58f12d0..f1ee8af 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -282,4 +283,13 @@ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
Resources.addTo(this.availableResource, deltaResource);
}
+ @Override
+ public void recoverContainer(RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ LOG.info("SchedulerNode " + getNodeID() + " is recovering container "
+ + rmContainer.getContainerId());
+ allocateContainer(null, rmContainer);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
index c487f48..825ca78 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
@@ -18,19 +18,34 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class NodeAddedSchedulerEvent extends SchedulerEvent {
private final RMNode rmNode;
+ private final List containerStatuses;
public NodeAddedSchedulerEvent(RMNode rmNode) {
super(SchedulerEventType.NODE_ADDED);
this.rmNode = rmNode;
+ this.containerStatuses = null;
+ }
+
+ public NodeAddedSchedulerEvent(RMNode rmNode,
+ List containerStatuses) {
+ super(SchedulerEventType.NODE_ADDED);
+ this.rmNode = rmNode;
+ this.containerStatuses = containerStatuses;
}
public RMNode getAddedRMNode() {
return rmNode;
}
+ public List getContainerStatuses() {
+ return containerStatuses;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index e842a6a..b396b07 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -33,8 +33,10 @@
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -254,4 +256,11 @@ public int getNumRunnableApps() {
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index 427cb86..67e5104 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -33,7 +33,9 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@Private
@Unstable
@@ -200,4 +202,11 @@ public ActiveUsersManager getActiveUsersManager() {
// Should never be called since all applications are submitted to LeafQueues
return null;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 97ea6d4..fcefea8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -275,5 +275,11 @@ public synchronized void applyDeltaOnAvailableResource(Resource deltaResource) {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
}
+
+ @Override
+ public void recoverContainer(RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 82000e1..f73710c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -79,6 +79,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -185,6 +186,13 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+
+ }
};
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index cfd05f9..1cb6c31 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -140,6 +141,23 @@ public AllocateResponse allocate(
return allocate(reqs, releases);
}
+ public void allocateAndwaitForContainers(int numContainers, int memory,
+ MockNM nm) throws Exception {
+ allocate(nm.getNodeId().getHost(), memory, numContainers,
+ new ArrayList());
+ List containers = new ArrayList();
+ int timeoutSecs = 0;
+ do {
+ nm.nodeHeartbeat(true);
+ containers.addAll(allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers());
+ Thread.sleep(200);
+ System.out.println("Attempt " + this.attemptId + " is waiting for "
+ + numContainers + " containers to be allocated. Currently has "
+ + containers.size() + " containers allocated.");
+ } while (containers.size() != numContainers && timeoutSecs++ < 60);
+ }
+
public List createReq(String[] hosts, int memory, int priority,
int containers) throws Exception {
List reqs = new ArrayList();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 49eff8b..65f5f01 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -37,6 +37,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -88,6 +89,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -1824,6 +1826,54 @@ protected void serviceStart() throws Exception {
MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1);
MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1);
}
+
+
+ // Test scheduler state including SchedulerAttempt, SchedulerNode,
+ // AppSchedulingInfo and Queue state can be reconstructed via the container
+ // statuses reports on NM registration.
+ @Test (timeout = 20000)
+ public void testSchedulerRecoveryOnRMRestart() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+ RMApp app1 = rm1.submitApp(200);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ int NUM_CONTAINERS = 1;
+ // allocate NUM_CONTAINERS containers and wait for them to be allocated.
+ am1.allocateAndwaitForContainers(NUM_CONTAINERS, 1024, nm1);
+
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
+ ContainerId containerId2 =
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
+ rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+
+ // Re-start RM
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ List containerStatuses = new ArrayList();
+ ContainerId masterContainerId = app1.getCurrentAppAttempt()
+ .getMasterContainer().getId();
+ ContainerStatus AMContainer =
+ ContainerStatus.newInstance(masterContainerId, ContainerState.RUNNING,
+ "AM container", 0);
+ ContainerStatus normalRunningContainer =
+ ContainerStatus.newInstance(containerId2, ContainerState.RUNNING,
+ "running normal container", 0);
+ containerStatuses.add(AMContainer);
+ containerStatuses.add(normalRunningContainer);
+
+ nm1.registerNode(containerStatuses);
+
+ rm2.waitForState(nm1, containerId2, RMContainerState.RUNNING);
+ rm2.waitForState(nm1, masterContainerId, RMContainerState.RUNNING);
+ }
private void writeToHostsFile(String... hosts) throws IOException {
if (!hostFile.exists()) {