diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml
index e34518c..bd9c9c5 100644
--- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml
+++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/resources/job_1329348432655_0001_conf.xml
@@ -152,7 +152,6 @@
mapreduce.map.speculativefalse
mapreduce.job.acl-view-job
mapreduce.map.output.key.classorg.apache.hadoop.io.IntWritable
-yarn.ipc.serializer.typeprotocolbuffers
mapreduce.job.end-notification.max.retry.interval5
ftp.blocksize67108864
mapreduce.tasktracker.http.threads40
diff --git hadoop-tools/hadoop-sls/src/main/data/2jobs2min-rumen-jh.json hadoop-tools/hadoop-sls/src/main/data/2jobs2min-rumen-jh.json
index 230e4fd..59ae8d7 100644
--- hadoop-tools/hadoop-sls/src/main/data/2jobs2min-rumen-jh.json
+++ hadoop-tools/hadoop-sls/src/main/data/2jobs2min-rumen-jh.json
@@ -4716,7 +4716,6 @@
"dfs.journalnode.http-address" : "0.0.0.0:8480",
"mapreduce.job.acl-view-job" : " ",
"mapreduce.reduce.shuffle.retry-delay.max.ms" : "60000",
- "yarn.ipc.serializer.type" : "protocolbuffers",
"mapreduce.job.end-notification.max.retry.interval" : "5",
"ftp.blocksize" : "67108864",
"mapreduce.tasktracker.http.threads" : "80",
@@ -4841,7 +4840,7 @@
"yarn.ipc.rpc.class" : "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC",
"mapreduce.job.name" : "TeraGen",
"kfs.blocksize" : "67108864",
- "yarn.resourcemanager.application-tokens.master-key-rolling-interval-secs" : "86400",
+ "yarn.resourcemanager.am-rm-tokens.master-key-rolling-interval-secs" : "86400",
"mapreduce.job.ubertask.maxmaps" : "9",
"yarn.scheduler.maximum-allocation-mb" : "8192",
"yarn.nodemanager.heartbeat.interval-ms" : "1000",
@@ -9830,7 +9829,6 @@
"dfs.journalnode.http-address" : "0.0.0.0:8480",
"mapreduce.job.acl-view-job" : " ",
"mapreduce.reduce.shuffle.retry-delay.max.ms" : "60000",
- "yarn.ipc.serializer.type" : "protocolbuffers",
"mapreduce.job.end-notification.max.retry.interval" : "5",
"ftp.blocksize" : "67108864",
"mapreduce.tasktracker.http.threads" : "80",
@@ -9955,7 +9953,7 @@
"yarn.ipc.rpc.class" : "org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC",
"mapreduce.job.name" : "TeraGen",
"kfs.blocksize" : "67108864",
- "yarn.resourcemanager.application-tokens.master-key-rolling-interval-secs" : "86400",
+ "yarn.resourcemanager.am-rm-tokens.master-key-rolling-interval-secs" : "86400",
"mapreduce.job.ubertask.maxmaps" : "9",
"yarn.scheduler.maximum-allocation-mb" : "8192",
"yarn.nodemanager.heartbeat.interval-ms" : "1000",
diff --git hadoop-yarn-project/CHANGES.txt hadoop-yarn-project/CHANGES.txt
index 7175b14..9c484c7 100644
--- hadoop-yarn-project/CHANGES.txt
+++ hadoop-yarn-project/CHANGES.txt
@@ -403,6 +403,9 @@ Release 2.6.0 - UNRELEASED
YARN-2565. Fixed RM to not use FileSystemApplicationHistoryStore unless
explicitly set. (Zhijie Shen via jianhe)
+ YARN-2460. Remove obsolete entries from yarn-default.xml (Ray Chiang via
+ aw)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index a2c3fd0..e642d05 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -31,22 +31,11 @@
- Type of serialization to use.
- yarn.ipc.serializer.type
- protocolbuffers
-
-
-
Factory to create server IPC classes.
yarn.ipc.server.factory.class
- Factory to create IPC exceptions.
- yarn.ipc.exception.factory.class
-
-
-
Factory to create serializeable records.
yarn.ipc.record.factory.class
@@ -163,12 +152,6 @@
- How often should the RM check that the AM is still alive.
- yarn.resourcemanager.amliveliness-monitor.interval-ms
- 1000
-
-
-
Maximum time to wait to establish connection to
ResourceManager.
yarn.resourcemanager.connect.max-wait.ms
@@ -221,12 +204,6 @@
- How often to check that node managers are still alive.
- yarn.resourcemanager.nm.liveness-monitor.interval-ms
- 1000
-
-
-
Path to file with nodes to include.
yarn.resourcemanager.nodes.include-path
@@ -580,7 +557,7 @@
Interval for the roll over for the master key used to generate
application tokens
- yarn.resourcemanager.application-tokens.master-key-rolling-interval-secs
+ yarn.resourcemanager.am-rm-tokens.master-key-rolling-interval-secs
86400
@@ -1091,20 +1068,6 @@
- Max time, in seconds, to wait to establish a connection to RM when NM starts.
- The NM will shutdown if it cannot connect to RM within the specified max time period.
- If the value is set as -1, then NM will retry forever.
- yarn.nodemanager.resourcemanager.connect.wait.secs
- 900
-
-
-
- Time interval, in seconds, between each NM attempt to connect to RM.
- yarn.nodemanager.resourcemanager.connect.retry_interval.secs
- 30
-
-
-
The minimum allowed version of a resourcemanager that a nodemanager will connect to.
The valid values are NONE (no version checking), EqualToNM (the resourcemanager's version is
equal to or greater than the NM version), or a Version String.
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 38dfa58..9887acc 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -30,6 +30,7 @@
NodeAction getNodeAction();
List getContainersToCleanup();
+ List getFinishedContainersPulledByAM();
List getApplicationsToCleanup();
@@ -43,6 +44,10 @@
void setNMTokenMasterKey(MasterKey secretKey);
void addAllContainersToCleanup(List containers);
+
+ // This tells NM to remove finished containers only after the AM
+ // has actually received it in a previous allocate response
+ void addFinishedContainersPulledByAM(List containers);
void addAllApplicationsToCleanup(List applications);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 775f95a..e9296f4 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -46,6 +46,7 @@
boolean viaProto = false;
private List containersToCleanup = null;
+ private List finishedContainersPulledByAM = null;
private List applicationsToCleanup = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@@ -73,6 +74,9 @@ private void mergeLocalToBuilder() {
if (this.applicationsToCleanup != null) {
addApplicationsToCleanupToProto();
}
+ if (this.finishedContainersPulledByAM != null) {
+ addFinishedContainersPulledByAMToProto();
+ }
if (this.containerTokenMasterKey != null) {
builder.setContainerTokenMasterKey(
convertToProtoFormat(this.containerTokenMasterKey));
@@ -199,6 +203,12 @@ public void setDiagnosticsMessage(String diagnosticsMessage) {
return this.containersToCleanup;
}
+ @Override
+ public List getFinishedContainersPulledByAM() {
+ initFinishedContainersPulledByAM();
+ return this.finishedContainersPulledByAM;
+ }
+
private void initContainersToCleanup() {
if (this.containersToCleanup != null) {
return;
@@ -212,6 +222,19 @@ private void initContainersToCleanup() {
}
}
+ private void initFinishedContainersPulledByAM() {
+ if (this.finishedContainersPulledByAM != null) {
+ return;
+ }
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getFinishedContainersPulledByAmList();
+ this.finishedContainersPulledByAM = new ArrayList();
+
+ for (ContainerIdProto c : list) {
+ this.finishedContainersPulledByAM.add(convertFromProtoFormat(c));
+ }
+ }
+
@Override
public void addAllContainersToCleanup(
final List containersToCleanup) {
@@ -221,6 +244,15 @@ public void addAllContainersToCleanup(
this.containersToCleanup.addAll(containersToCleanup);
}
+ @Override
+ public void addFinishedContainersPulledByAM(
+ final List finishedContainersPulledByAM) {
+ if (finishedContainersPulledByAM == null)
+ return;
+ initFinishedContainersPulledByAM();
+ this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM);
+ }
+
private void addContainersToCleanupToProto() {
maybeInitBuilder();
builder.clearContainersToCleanup();
@@ -256,6 +288,41 @@ public void remove() {
builder.addAllContainersToCleanup(iterable);
}
+ private void addFinishedContainersPulledByAMToProto() {
+ maybeInitBuilder();
+ builder.clearFinishedContainersPulledByAm();
+ if (finishedContainersPulledByAM == null)
+ return;
+ Iterable iterable = new Iterable() {
+
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter = finishedContainersPulledByAM.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ContainerIdProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllFinishedContainersPulledByAm(iterable);
+ }
+
@Override
public List getApplicationsToCleanup() {
initApplicationsToCleanup();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 29cd64e..600f54d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -58,6 +58,7 @@ message NodeHeartbeatResponseProto {
repeated ApplicationIdProto applications_to_cleanup = 6;
optional int64 nextHeartBeatInterval = 7;
optional string diagnostics_message = 8;
+ repeated ContainerIdProto finished_containers_pulled_by_am = 9;
}
message NMContainerStatusProto {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index a479be2..43770c1 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -311,7 +311,7 @@ public void run() {
public static class NMContext implements Context {
private NodeId nodeId = null;
- private final ConcurrentMap applications =
+ protected final ConcurrentMap applications =
new ConcurrentHashMap();
protected final ConcurrentMap containers =
new ConcurrentSkipListMap();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index b52b0fb..b9feacb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -104,11 +104,6 @@
// Duration for which to track recently stopped container.
private long durationToTrackStoppedContainers;
- // This is used to track the current completed containers when nodeheartBeat
- // is called. These completed containers will be removed from NM context after
- // nodeHeartBeat succeeds and the response from the nodeHeartBeat is
- // processed.
- private final Set previousCompletedContainers;
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
@@ -125,7 +120,6 @@ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
this.metrics = metrics;
this.recentlyStoppedContainers =
new LinkedHashMap();
- this.previousCompletedContainers = new HashSet();
}
@Override
@@ -331,7 +325,7 @@ protected void registerWithRM()
return appList;
}
- private NodeStatus getNodeStatus(int responseId) {
+ private NodeStatus getNodeStatus(int responseId) throws IOException {
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
@@ -352,11 +346,18 @@ private NodeStatus getNodeStatus(int responseId) {
// Iterate through the NMContext and clone and get all the containers'
// statuses. If it's a completed container, add into the
- // recentlyStoppedContainers and previousCompletedContainers collections.
+ // recentlyStoppedContainers collections.
@VisibleForTesting
- protected List getContainerStatuses() {
+ protected List getContainerStatuses() throws IOException {
List containerStatuses = new ArrayList();
for (Container container : this.context.getContainers().values()) {
+ ContainerId containerId = container.getContainerId();
+ ApplicationId applicationId = container.getContainerId()
+ .getApplicationAttemptId().getApplicationId();
+ if (!this.context.getApplications().containsKey(applicationId)) {
+ context.getContainers().remove(containerId);
+ continue;
+ }
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
containerStatuses.add(containerStatus);
@@ -381,10 +382,17 @@ private NodeStatus getNodeStatus(int responseId) {
}
// These NMContainerStatus are sent on NM registration and used by YARN only.
- private List getNMContainerStatuses() {
+ private List getNMContainerStatuses() throws IOException {
List containerStatuses =
new ArrayList();
for (Container container : this.context.getContainers().values()) {
+ ContainerId containerId = container.getContainerId();
+ ApplicationId applicationId = container.getContainerId()
+ .getApplicationAttemptId().getApplicationId();
+ if (!this.context.getApplications().containsKey(applicationId)) {
+ context.getContainers().remove(containerId);
+ continue;
+ }
NMContainerStatus status =
container.getNMContainerStatus();
containerStatuses.add(status);
@@ -402,26 +410,30 @@ private NodeStatus getNodeStatus(int responseId) {
@Override
public void addCompletedContainer(ContainerId containerId) {
- synchronized (previousCompletedContainers) {
- previousCompletedContainers.add(containerId);
- }
synchronized (recentlyStoppedContainers) {
removeVeryOldStoppedContainersFromCache();
- recentlyStoppedContainers.put(containerId,
- System.currentTimeMillis() + durationToTrackStoppedContainers);
+ if (!recentlyStoppedContainers.containsKey(containerId)) {
+ recentlyStoppedContainers.put(containerId,
+ System.currentTimeMillis() + durationToTrackStoppedContainers);
+ }
}
}
- private void removeCompletedContainersFromContext() {
- synchronized (previousCompletedContainers) {
- if (!previousCompletedContainers.isEmpty()) {
- for (ContainerId containerId : previousCompletedContainers) {
- this.context.getContainers().remove(containerId);
- }
- LOG.info("Removed completed containers from NM context: "
- + previousCompletedContainers);
- previousCompletedContainers.clear();
- }
+ @VisibleForTesting
+ @Private
+ public void removeCompletedContainersFromContext(
+ ListcontainerIds) throws IOException {
+ Set removedContainers = new HashSet();
+
+ // If the AM has pulled the completedContainer it can be removed
+ for (ContainerId containerId : containerIds) {
+ context.getContainers().remove(containerId);
+ removedContainers.add(containerId);
+ }
+
+ if (!removedContainers.isEmpty()) {
+ LOG.info("Removed completed containers from NM context: " +
+ removedContainers);
}
}
@@ -454,7 +466,7 @@ public boolean isContainerRecentlyStopped(ContainerId containerId) {
return recentlyStoppedContainers.containsKey(containerId);
}
}
-
+
@Override
public void clearFinishedContainersFromCache() {
synchronized (recentlyStoppedContainers) {
@@ -472,11 +484,13 @@ public void removeVeryOldStoppedContainersFromCache() {
while (i.hasNext()) {
ContainerId cid = i.next();
if (recentlyStoppedContainers.get(cid) < currentTime) {
- i.remove();
- try {
- context.getNMStateStore().removeContainer(cid);
- } catch (IOException e) {
- LOG.error("Unable to remove container " + cid + " in store", e);
+ if (!context.getContainers().containsKey(cid)) {
+ i.remove();
+ try {
+ context.getNMStateStore().removeContainer(cid);
+ } catch (IOException e) {
+ LOG.error("Unable to remove container " + cid + " in store", e);
+ }
}
} else {
break;
@@ -542,7 +556,9 @@ public void run() {
// don't want to remove the completed containers before resync
// because these completed containers will be reported back to RM
// when NM re-registers with RM.
- removeCompletedContainersFromContext();
+ // Only remove the cleanedup containers that are acked
+ removeCompletedContainersFromContext(response
+ .getFinishedContainersPulledByAM());
lastHeartBeatID = response.getResponseId();
List containersToCleanup = response
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index acda2a9..85bafb3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -247,6 +248,10 @@ public RegisterNodeManagerResponse registerNodeManager(
// put the completed container into the context
getNMContext().getContainers().put(
testCompleteContainer.getContainerId(), container);
+ getNMContext().getApplications().put(
+ testCompleteContainer.getContainerId()
+ .getApplicationAttemptId().getApplicationId(),
+ mock(Application.class));
} else {
// second register contains the completed container info.
List statuses =
@@ -382,9 +387,17 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
if (containersShouldBePreserved) {
Assert.assertFalse(containers.isEmpty());
Assert.assertTrue(containers.containsKey(existingCid));
+ Assert.assertEquals(ContainerState.RUNNING,
+ containers.get(existingCid)
+ .cloneAndGetContainerStatus().getState());
} else {
- // ensure that containers are empty before restart nodeStatusUpdater
- Assert.assertTrue(containers.isEmpty());
+ // ensure that containers are empty or are completed before
+ // restart nodeStatusUpdater
+ if (!containers.isEmpty()) {
+ Assert.assertEquals(ContainerState.COMPLETE,
+ containers.get(existingCid)
+ .cloneAndGetContainerStatus().getState());
+ }
}
super.rebootNodeStatusUpdaterAndRegisterWithRM();
}
@@ -465,7 +478,12 @@ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
try {
// ensure that containers are empty before restart nodeStatusUpdater
- Assert.assertTrue(containers.isEmpty());
+ if (!containers.isEmpty()) {
+ for (Container container: containers.values()) {
+ Assert.assertEquals(ContainerState.COMPLETE,
+ container.cloneAndGetContainerStatus().getState());
+ }
+ }
super.rebootNodeStatusUpdaterAndRegisterWithRM();
// After this point new containers are free to be launched, except
// containers from previous RM
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index f2a3a4a..8fb51a3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -180,7 +181,7 @@ public RegisterNodeManagerResponse registerNodeManager(
Map> map =
new HashMap>();
for (ContainerStatus cs : containers) {
- ApplicationId applicationId =
+ ApplicationId applicationId =
cs.getContainerId().getApplicationAttemptId().getApplicationId();
List appContainers = map.get(applicationId);
if (appContainers == null) {
@@ -205,10 +206,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
nodeStatus.setResponseId(heartBeatID++);
Map> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
-
+
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
-
+
if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
@@ -419,7 +420,7 @@ protected void stopRMProxy() {
}
private class MyNodeManager extends NodeManager {
-
+
private MyNodeStatusUpdater3 nodeStatusUpdater;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
@@ -433,7 +434,7 @@ public MyNodeStatusUpdater3 getNodeStatusUpdater() {
return this.nodeStatusUpdater;
}
}
-
+
private class MyNodeManager2 extends NodeManager {
public boolean isStopped = false;
private NodeStatusUpdater nodeStatusUpdater;
@@ -467,7 +468,7 @@ protected void serviceStop() throws Exception {
syncBarrier.await(10000, TimeUnit.MILLISECONDS);
}
}
- //
+ //
private class MyResourceTracker2 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -478,7 +479,7 @@ protected void serviceStop() throws Exception {
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
-
+
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction );
@@ -493,7 +494,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++);
-
+
NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
null, null, null, 1000L);
@@ -501,7 +502,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
return nhResponse;
}
}
-
+
private class MyResourceTracker3 implements ResourceTracker {
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -513,7 +514,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
MyResourceTracker3(Context context) {
this.context = context;
}
-
+
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
@@ -564,6 +565,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
public NodeAction registerNodeAction = NodeAction.NORMAL;
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
private Context context;
+ private final ContainerStatus containerStatus2 =
+ createContainerStatus(2, ContainerState.RUNNING);
+ private final ContainerStatus containerStatus3 =
+ createContainerStatus(3, ContainerState.COMPLETE);
+ private final ContainerStatus containerStatus4 =
+ createContainerStatus(4, ContainerState.RUNNING);
+ private final ContainerStatus containerStatus5 =
+ createContainerStatus(5, ContainerState.COMPLETE);
public MyResourceTracker4(Context context) {
this.context = context;
@@ -583,6 +592,8 @@ public RegisterNodeManagerResponse registerNodeManager(
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
+ List finishedContainersPulledByAM = new ArrayList
+ ();
try {
if (heartBeatID == 0) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
@@ -594,10 +605,6 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
Assert.assertEquals(statuses.size(), 2);
Assert.assertEquals(context.getContainers().size(), 2);
- ContainerStatus containerStatus2 =
- createContainerStatus(2, ContainerState.RUNNING);
- ContainerStatus containerStatus3 =
- createContainerStatus(3, ContainerState.COMPLETE);
boolean container2Exist = false, container3Exist = false;
for (ContainerStatus status : statuses) {
if (status.getContainerId().equals(
@@ -619,23 +626,14 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
// nodeStatusUpdaterRunnable, otherwise nm just shuts down and the
// test passes.
throw new YarnRuntimeException("Lost the heartbeat response");
- } else if (heartBeatID == 2) {
+ } else if (heartBeatID == 2 || heartBeatID == 3) {
List statuses =
request.getNodeStatus().getContainersStatuses();
Assert.assertEquals(statuses.size(), 4);
Assert.assertEquals(context.getContainers().size(), 4);
- ContainerStatus containerStatus2 =
- createContainerStatus(2, ContainerState.RUNNING);
- ContainerStatus containerStatus3 =
- createContainerStatus(3, ContainerState.COMPLETE);
- ContainerStatus containerStatus4 =
- createContainerStatus(4, ContainerState.RUNNING);
- ContainerStatus containerStatus5 =
- createContainerStatus(5, ContainerState.COMPLETE);
-
- boolean container2Exist = false, container3Exist = false, container4Exist =
- false, container5Exist = false;
+ boolean container2Exist = false, container3Exist = false,
+ container4Exist = false, container5Exist = false;
for (ContainerStatus status : statuses) {
if (status.getContainerId().equals(
containerStatus2.getContainerId())) {
@@ -664,6 +662,24 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
}
Assert.assertTrue(container2Exist && container3Exist
&& container4Exist && container5Exist);
+
+ if (heartBeatID == 3) {
+ finishedContainersPulledByAM.add(containerStatus3.getContainerId());
+ }
+ } else if (heartBeatID == 4) {
+ List statuses =
+ request.getNodeStatus().getContainersStatuses();
+ Assert.assertEquals(statuses.size(), 3);
+ Assert.assertEquals(context.getContainers().size(), 3);
+
+ boolean container3Exist = false;
+ for (ContainerStatus status : statuses) {
+ if (status.getContainerId().equals(
+ containerStatus3.getContainerId())) {
+ container3Exist = true;
+ }
+ }
+ Assert.assertFalse(container3Exist);
}
} catch (AssertionError error) {
error.printStackTrace();
@@ -676,6 +692,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
heartBeatNodeAction, null, null, null, null, 1000L);
+ nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM);
return nhResponse;
}
}
@@ -686,7 +703,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException,
IOException {
-
+
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction );
@@ -694,7 +711,7 @@ public RegisterNodeManagerResponse registerNodeManager(
response.setNMTokenMasterKey(createMasterKey());
return response;
}
-
+
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
@@ -767,11 +784,11 @@ public void deleteBaseDir() throws IOException {
lfs.delete(new Path(basedir.getPath()), true);
}
- @Test(timeout = 90000)
- public void testRecentlyFinishedContainers() throws Exception {
- NodeManager nm = new NodeManager();
- YarnConfiguration conf = new YarnConfiguration();
- conf.set(
+ @Test(timeout = 90000)
+ public void testRecentlyFinishedContainers() throws Exception {
+ NodeManager nm = new NodeManager();
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(
NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
"10000");
nm.init(conf);
@@ -780,27 +797,112 @@ public void testRecentlyFinishedContainers() throws Exception {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 0);
- ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
-
-
+ ContainerId cId = ContainerId.newInstance(appAttemptId, 0);
+ nm.getNMContext().getApplications().putIfAbsent(appId,
+ mock(Application.class));
+ nm.getNMContext().getContainers().putIfAbsent(cId, mock(Container.class));
+
nodeStatusUpdater.addCompletedContainer(cId);
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
-
+
+ nm.getNMContext().getContainers().remove(cId);
long time1 = System.currentTimeMillis();
int waitInterval = 15;
while (waitInterval-- > 0
&& nodeStatusUpdater.isContainerRecentlyStopped(cId)) {
- nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
+ nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
Thread.sleep(1000);
}
- long time2 = System.currentTimeMillis();
+ long time2 = System.currentTimeMillis();
// By this time the container will be removed from cache. need to verify.
- Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
- Assert.assertTrue((time2 - time1) >= 10000 && (time2 -time1) <= 250000);
- }
-
+ Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
+ Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000);
+ }
+ @Test(timeout = 90000)
+ public void testRemovePreviousCompletedContainersFromContext() throws Exception {
+ NodeManager nm = new NodeManager();
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(
+ NodeStatusUpdaterImpl
+ .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+ "10000");
+ nm.init(conf);
+ NodeStatusUpdaterImpl nodeStatusUpdater =
+ (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 0);
+ ContainerId cId = ContainerId.newInstance(appAttemptId, 1);
+ Token containerToken =
+ BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
+ BuilderUtils.newResource(1024, 1), 0, 123,
+ "password".getBytes(), 0);
+ Container anyCompletedContainer = new ContainerImpl(conf, null,
+ null, null, null, null,
+ BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+
+ @Override
+ public ContainerState getCurrentState() {
+ return ContainerState.COMPLETE;
+ }
+ };
+
+ nm.getNMContext().getApplications().putIfAbsent(appId,
+ mock(Application.class));
+ nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
+ Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
+
+ List ackedContainers = new ArrayList();
+ ackedContainers.add(cId);
+
+ nodeStatusUpdater.removeCompletedContainersFromContext(ackedContainers);
+ Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().isEmpty());
+ }
+
+ @Test
+ public void testCleanedupApplicationContainerCleanup() throws IOException {
+ NodeManager nm = new NodeManager();
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(NodeStatusUpdaterImpl
+ .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
+ "1000000");
+ nm.init(conf);
+
+ NodeStatusUpdaterImpl nodeStatusUpdater =
+ (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 0);
+
+ ContainerId cId = ContainerId.newInstance(appAttemptId, 1);
+ Token containerToken =
+ BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
+ BuilderUtils.newResource(1024, 1), 0, 123,
+ "password".getBytes(), 0);
+ Container anyCompletedContainer = new ContainerImpl(conf, null,
+ null, null, null, null,
+ BuilderUtils.newContainerTokenIdentifier(containerToken)) {
+
+ @Override
+ public ContainerState getCurrentState() {
+ return ContainerState.COMPLETE;
+ }
+ };
+
+ nm.getNMContext().getApplications().putIfAbsent(appId,
+ mock(Application.class));
+ nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
+
+ Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
+
+ nm.getNMContext().getApplications().remove(appId);
+ nodeStatusUpdater.removeCompletedContainersFromContext(new ArrayList
+ ());
+ Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size());
+ }
+
@Test
public void testNMRegistration() throws InterruptedException {
nm = new NodeManager() {
@@ -860,7 +962,7 @@ public void run() {
nm.stop();
}
-
+
@Test
public void testStopReentrant() throws Exception {
final AtomicInteger numCleanups = new AtomicInteger(0);
@@ -875,7 +977,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater;
}
-
+
@Override
protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
@@ -897,7 +999,7 @@ public void cleanUpApplicationsOnNMShutDown() {
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
-
+
int waitCount = 0;
while (heartBeatID < 1 && waitCount++ != 200) {
Thread.sleep(500);
@@ -906,7 +1008,7 @@ public void cleanUpApplicationsOnNMShutDown() {
// Meanwhile call stop directly as the shutdown hook would
nm.stop();
-
+
// NM takes a while to reach the STOPPED state.
waitCount = 0;
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
@@ -1172,9 +1274,13 @@ protected NMContext createNMContext(
nm.start();
int waitCount = 0;
- while (heartBeatID <= 3 && waitCount++ != 20) {
+ while (heartBeatID <= 4 && waitCount++ != 20) {
Thread.sleep(500);
}
+ if (heartBeatID <= 4) {
+ Assert.fail("Failed to get all heartbeats in time, " +
+ "heartbeatID:" + heartBeatID);
+ }
if(assertionFailedInThread.get()) {
Assert.fail("ContainerStatus Backup failed");
}
@@ -1182,7 +1288,7 @@ protected NMContext createNMContext(
}
@Test(timeout = 200000)
- public void testNodeStatusUpdaterRetryAndNMShutdown()
+ public void testNodeStatusUpdaterRetryAndNMShutdown()
throws Exception {
final long connectionWaitSecs = 1000;
final long connectionRetryIntervalMs = 1000;
@@ -1190,7 +1296,7 @@ public void testNodeStatusUpdaterRetryAndNMShutdown()
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
connectionWaitSecs);
conf.setLong(YarnConfiguration
- .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
+ .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
connectionRetryIntervalMs);
conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
@@ -1281,30 +1387,36 @@ public MyNMContext(
} else if (heartBeatID == 1) {
ContainerStatus containerStatus2 =
createContainerStatus(2, ContainerState.RUNNING);
- Container container2 = getMockContainer(containerStatus2);
- containers.put(containerStatus2.getContainerId(), container2);
+ putMockContainer(containerStatus2);
ContainerStatus containerStatus3 =
createContainerStatus(3, ContainerState.COMPLETE);
- Container container3 = getMockContainer(containerStatus3);
- containers.put(containerStatus3.getContainerId(), container3);
+ putMockContainer(containerStatus3);
return containers;
} else if (heartBeatID == 2) {
ContainerStatus containerStatus4 =
createContainerStatus(4, ContainerState.RUNNING);
- Container container4 = getMockContainer(containerStatus4);
- containers.put(containerStatus4.getContainerId(), container4);
+ putMockContainer(containerStatus4);
ContainerStatus containerStatus5 =
createContainerStatus(5, ContainerState.COMPLETE);
- Container container5 = getMockContainer(containerStatus5);
- containers.put(containerStatus5.getContainerId(), container5);
+ putMockContainer(containerStatus5);
+ return containers;
+ } else if (heartBeatID == 3 || heartBeatID == 4) {
return containers;
} else {
containers.clear();
return containers;
}
}
+
+ private void putMockContainer(ContainerStatus containerStatus) {
+ Container container = getMockContainer(containerStatus);
+ containers.put(containerStatus.getContainerId(), container);
+ applications.putIfAbsent(containerStatus.getContainerId()
+ .getApplicationAttemptId().getApplicationId(),
+ mock(Application.class));
+ }
}
public static ContainerStatus createContainerStatus(int id,
@@ -1345,7 +1457,7 @@ private void verifyNodeStartFailure(String errMessage) throws Exception {
throw e;
}
}
-
+
// the service should be stopped
Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm
.getServiceState());
@@ -1364,7 +1476,7 @@ private YarnConfiguration createNMConfig() {
}
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345");
- conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346");
+ conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346");
conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
remoteLogsDir.getAbsolutePath());
@@ -1372,7 +1484,7 @@ private YarnConfiguration createNMConfig() {
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
return conf;
}
-
+
private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
return new NodeManager() {
@Override
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 4798120..4222888 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -198,7 +198,7 @@ protected void serviceStop() throws Exception {
*/
@SuppressWarnings("unchecked")
@VisibleForTesting
- void handleNMContainerStatus(NMContainerStatus containerStatus) {
+ void handleNMContainerStatus(NMContainerStatus containerStatus, NodeId nodeId) {
ApplicationAttemptId appAttemptId =
containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp =
@@ -229,7 +229,8 @@ void handleNMContainerStatus(NMContainerStatus containerStatus) {
containerStatus.getContainerExitStatus());
// sending master container finished event.
RMAppAttemptContainerFinishedEvent evt =
- new RMAppAttemptContainerFinishedEvent(appAttemptId, status);
+ new RMAppAttemptContainerFinishedEvent(appAttemptId, status,
+ nodeId);
rmContext.getDispatcher().getEventHandler().handle(evt);
}
}
@@ -324,7 +325,7 @@ public RegisterNodeManagerResponse registerNodeManager(
LOG.info("received container statuses on node manager register :"
+ request.getNMContainerStatuses());
for (NMContainerStatus status : request.getNMContainerStatuses()) {
- handleNMContainerStatus(status);
+ handleNMContainerStatus(status, nodeId);
}
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index ff520be..0b8f321 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1181,7 +1181,7 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
int numberOfFailure = app.getNumFailedAppAttempts();
if (!app.submissionContext.getUnmanagedAM()
&& numberOfFailure < app.maxAppAttempts) {
- boolean transferStateFromPreviousAttempt = false;
+ boolean transferStateFromPreviousAttempt;
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
transferStateFromPreviousAttempt =
failedEvent.getTransferStateFromPreviousAttempt();
@@ -1191,11 +1191,11 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
// Transfer the state from the previous attempt to the current attempt.
// Note that the previous failed attempt may still be collecting the
// container events from the scheduler and update its data structures
- // before the new attempt is created.
- if (transferStateFromPreviousAttempt) {
- ((RMAppAttemptImpl) app.currentAttempt)
- .transferStateFromPreviousAttempt(oldAttempt);
- }
+ // before the new attempt is created. We always transferState for
+ // finished containers so that they can be acked to NM,
+ // but when pulling finished container we will check this flag again.
+ ((RMAppAttemptImpl) app.currentAttempt)
+ .transferStateFromPreviousAttempt(oldAttempt);
return initialState;
} else {
if (numberOfFailure >= app.maxAppAttempts) {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index 943a5e5..cf8c2bb 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import javax.crypto.SecretKey;
@@ -31,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -120,14 +122,29 @@
List pullJustFinishedContainers();
/**
- * Return the list of last set of finished containers. This does not reset the
- * finished containers.
- * @return the list of just finished contianers, this does not reset the
+ * Returns a reference to the map of last set of finished containers to the
+ * corresponding node. This does not reset the finished containers.
+ * @return the list of just finished containers, this does not reset the
* finished containers.
*/
+ ConcurrentMap>
+ getJustFinishedContainersReference();
+
+ /**
+ * Return the list of last set of finished containers. This does not reset
+ * the finished containers.
+ * @return the list of just finished containers
+ */
List getJustFinishedContainers();
/**
+ * The map of conatiners per Node that are already sent to the AM.
+ * @return map of per node list of finished container status sent to AM
+ */
+ ConcurrentMap>
+ getFinishedContainersSentToAMReference();
+
+ /**
* The container on which the Application Master is running.
* @return the {@link Container} on which the application master is running.
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 7ca57ee..d8db468 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -24,9 +24,12 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -52,6 +55,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@@ -83,6 +87,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -129,9 +134,16 @@
private final ApplicationSubmissionContext submissionContext;
private Token amrmToken = null;
private SecretKey clientTokenMasterKey = null;
-
- private List justFinishedContainers =
- new ArrayList();
+
+ private ConcurrentMap>
+ justFinishedContainers =
+ new ConcurrentHashMap>();
+ // Tracks the previous finished containers that are waiting to be
+ // verified as received by the AM. If the AM sends the next allocate
+ // request it implicitly acks this list.
+ private ConcurrentMap>
+ finishedContainersSentToAM =
+ new ConcurrentHashMap>();
private Container masterContainer;
private float progress = 0;
@@ -627,10 +639,28 @@ public float getProgress() {
}
}
+ @VisibleForTesting
@Override
public List getJustFinishedContainers() {
this.readLock.lock();
try {
+ List returnList = new ArrayList();
+ for (Collection containerStatusList :
+ justFinishedContainers.values()) {
+ returnList.addAll(containerStatusList);
+ }
+ return returnList;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ public ConcurrentMap>
+ getJustFinishedContainersReference
+ () {
+ this.readLock.lock();
+ try {
return this.justFinishedContainers;
} finally {
this.readLock.unlock();
@@ -638,14 +668,67 @@ public float getProgress() {
}
@Override
+ public ConcurrentMap>
+ getFinishedContainersSentToAMReference() {
+ this.readLock.lock();
+ try {
+ return this.finishedContainersSentToAM;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
public List pullJustFinishedContainers() {
this.writeLock.lock();
try {
- List returnList = new ArrayList(
- this.justFinishedContainers.size());
- returnList.addAll(this.justFinishedContainers);
- this.justFinishedContainers.clear();
+ List returnList = new ArrayList();
+
+ // A new allocate means the AM received the previously sent
+ // finishedContainers. We can ack this to NM now
+ for (NodeId nodeId:finishedContainersSentToAM.keySet()) {
+
+ // Clear and get current values
+ List currentSentContainers =
+ finishedContainersSentToAM
+ .put(nodeId, new ArrayList());
+ List containerIdList = new ArrayList
+ (currentSentContainers.size());
+ for (ContainerStatus containerStatus:currentSentContainers) {
+ containerIdList.add(containerStatus.getContainerId());
+ }
+ eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(
+ nodeId, containerIdList));
+ }
+
+ // Mark every containerStatus as being sent to AM though we may return
+ // only the ones that belong to the current attempt
+ boolean keepContainersAcressAttempts = this.submissionContext
+ .getKeepContainersAcrossApplicationAttempts();
+ for (NodeId nodeId:justFinishedContainers.keySet()) {
+
+ // Clear and get current values
+ List finishedContainers = justFinishedContainers.put
+ (nodeId, new ArrayList());
+
+ if (keepContainersAcressAttempts) {
+ returnList.addAll(finishedContainers);
+ } else {
+ // Filter out containers from previous attempt
+ for (ContainerStatus containerStatus: finishedContainers) {
+ if (containerStatus.getContainerId().getApplicationAttemptId()
+ .equals(this.getAppAttemptId())) {
+ returnList.add(containerStatus);
+ }
+ }
+ }
+
+ finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList
+ ());
+ finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
+ }
+
return returnList;
} finally {
this.writeLock.unlock();
@@ -732,7 +815,7 @@ public void recover(RMState state) throws Exception {
}
setMasterContainer(attemptState.getMasterContainer());
recoverAppAttemptCredentials(attemptState.getAppAttemptCredentials(),
- attemptState.getState());
+ attemptState.getState());
this.recoveredFinalState = attemptState.getState();
this.originalTrackingUrl = attemptState.getFinalTrackingUrl();
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
@@ -744,7 +827,9 @@ public void recover(RMState state) throws Exception {
}
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
- this.justFinishedContainers = attempt.getJustFinishedContainers();
+ this.justFinishedContainers = attempt.getJustFinishedContainersReference();
+ this.finishedContainersSentToAM =
+ attempt.getFinishedContainersSentToAMReference();
}
private void recoverAppAttemptCredentials(Credentials appAttemptTokens,
@@ -1507,6 +1592,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
+ // Add all finished containers so that they can be acked to NM
+ addJustFinishedContainer(appAttempt, containerFinishedEvent);
+
// Is this container the AmContainer? If the finished container is same as
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer != null
@@ -1519,12 +1607,18 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
return RMAppAttemptState.FINAL_SAVING;
}
- // Normal container.Put it in completed containers list
- appAttempt.justFinishedContainers.add(containerStatus);
return this.currentState;
}
}
+ private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
+ RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
+ appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
+ .getNodeId(), new ArrayList());
+ appAttempt.justFinishedContainers.get(containerFinishedEvent
+ .getNodeId()).add(containerFinishedEvent.getContainerStatus());
+ }
+
private static final class ContainerFinishedAtFinalStateTransition
extends BaseTransition {
@Override
@@ -1536,7 +1630,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
// Normal container. Add it in completed containers list
- appAttempt.justFinishedContainers.add(containerStatus);
+ addJustFinishedContainer(appAttempt, containerFinishedEvent);
}
}
@@ -1569,6 +1663,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
+ // Add all finished containers so that they can be acked to NM.
+ addJustFinishedContainer(appAttempt, containerFinishedEvent);
+
// Is this container the ApplicationMaster container?
if (appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
@@ -1576,8 +1673,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
appAttempt, containerFinishedEvent);
return RMAppAttemptState.FINISHED;
}
- // Normal container.
- appAttempt.justFinishedContainers.add(containerStatus);
+
return RMAppAttemptState.FINISHING;
}
}
@@ -1592,6 +1688,9 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
+ // Add all finished containers so that they can be acked to NM.
+ addJustFinishedContainer(appAttempt, containerFinishedEvent);
+
// If this is the AM container, it means the AM container is finished,
// but we are not yet acknowledged that the final state has been saved.
// Thus, we still return FINAL_SAVING state here.
@@ -1611,8 +1710,6 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
return;
}
- // Normal container.
- appAttempt.justFinishedContainers.add(containerStatus);
}
}
@@ -1629,7 +1726,7 @@ public AMFinishedAfterFinalSavingTransition(
transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
appAttempt.updateInfoOnAMUnregister(amUnregisteredEvent);
new FinalTransition(RMAppAttemptState.FINISHED).transition(appAttempt,
- event);
+ event);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java
index 3660597..39c6f29 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java
@@ -20,21 +20,27 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent {
private final ContainerStatus containerStatus;
+ private final NodeId nodeId;
public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId,
- ContainerStatus containerStatus) {
+ ContainerStatus containerStatus, NodeId nodeId) {
super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED);
this.containerStatus = containerStatus;
+ this.nodeId = nodeId;
}
public ContainerStatus getContainerStatus() {
return this.containerStatus;
}
+ public NodeId getNodeId() {
+ return this.nodeId;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 885e864..479734a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -78,13 +78,13 @@
RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
// Transitions from RESERVED state
- .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
+ .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
- .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED,
+ .addTransition(RMContainerState.RESERVED, RMContainerState.ALLOCATED,
RMContainerEventType.START, new ContainerStartedTransition())
- .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED,
+ .addTransition(RMContainerState.RESERVED, RMContainerState.KILLED,
RMContainerEventType.KILL) // nothing to do
- .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED,
+ .addTransition(RMContainerState.RESERVED, RMContainerState.RELEASED,
RMContainerEventType.RELEASED) // nothing to do
@@ -100,7 +100,7 @@
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
RMContainerEventType.LAUNCHED, new LaunchedTransition())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
- RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState())
+ RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
RMContainerEventType.RELEASED, new KillTransition())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED,
@@ -495,7 +495,8 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
updateAttemptMetrics(container);
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
- container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
+ container.appAttemptId, finishedEvent.getRemoteContainerStatus(),
+ container.getAllocatedNode()));
container.rmContext.getRMApplicationHistoryWriter().containerFinished(
container);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
index c0096b9..b4d0b8b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
@@ -40,6 +40,9 @@
CONTAINER_ALLOCATED,
CLEANUP_CONTAINER,
+ // Source: RMAppAttempt
+ FINISHED_CONTAINERS_PULLED_BY_AM,
+
// Source: NMLivelinessMonitor
EXPIRE
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java
new file mode 100644
index 0000000..a4fb707
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeFinishedContainersPulledByAMEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+import java.util.List;
+
+// Happens after an implicit ack from AM that the container completion has
+// been notified successfully to the AM
+public class RMNodeFinishedContainersPulledByAMEvent extends RMNodeEvent {
+
+ private List containers;
+
+ public RMNodeFinishedContainersPulledByAMEvent(NodeId nodeId,
+ List containers) {
+ super(nodeId, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM);
+ this.containers = containers;
+ }
+
+ public List getContainers() {
+ return this.containers;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 1265958..f0ae826 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -112,6 +112,10 @@
private final Set containersToClean = new TreeSet(
new ContainerIdComparator());
+ /* set of containers that were notified to AM about their completion */
+ private final Set finishedContainersPulledByAM =
+ new HashSet();
+
/* the list of applications that have finished and need to be purged */
private final List finishedApplications = new ArrayList();
@@ -135,7 +139,7 @@
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from RUNNING state
- .addTransition(NodeState.RUNNING,
+ .addTransition(NodeState.RUNNING,
EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
.addTransition(NodeState.RUNNING, NodeState.DECOMMISSIONED,
@@ -152,29 +156,39 @@
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new FinishedContainersPulledByAMTransition())
+ .addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
//Transitions from REBOOTED state
.addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
- RMNodeEventType.RESOURCE_UPDATE,
+ RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
//Transitions from DECOMMISSIONED state
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
- RMNodeEventType.RESOURCE_UPDATE,
+ RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
-
+ .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new FinishedContainersPulledByAMTransition())
+
//Transitions from LOST state
.addTransition(NodeState.LOST, NodeState.LOST,
- RMNodeEventType.RESOURCE_UPDATE,
+ RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
+ .addTransition(NodeState.LOST, NodeState.LOST,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new FinishedContainersPulledByAMTransition())
//Transitions from UNHEALTHY state
- .addTransition(NodeState.UNHEALTHY,
+ .addTransition(NodeState.UNHEALTHY,
EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
- RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenUnHealthyTransition())
+ RMNodeEventType.STATUS_UPDATE,
+ new StatusUpdateWhenUnHealthyTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.DECOMMISSIONED,
RMNodeEventType.DECOMMISSION,
new DeactivateNodeTransition(NodeState.DECOMMISSIONED))
@@ -192,7 +206,10 @@
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
-
+ .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+ RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
+ new FinishedContainersPulledByAMTransition())
+
// create the topology tables
.installTopology();
@@ -365,8 +382,11 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response
response.addAllContainersToCleanup(
new ArrayList(this.containersToClean));
response.addAllApplicationsToCleanup(this.finishedApplications);
+ response.addFinishedContainersPulledByAM(
+ new ArrayList(this.finishedContainersPulledByAM));
this.containersToClean.clear();
this.finishedApplications.clear();
+ this.finishedContainersPulledByAM.clear();
} finally {
this.writeLock.unlock();
}
@@ -652,6 +672,16 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
}
+ public static class FinishedContainersPulledByAMTransition implements
+ SingleArcTransition {
+
+ @Override
+ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+ rmNode.finishedContainersPulledByAM.addAll(((
+ RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
+ }
+ }
+
public static class DeactivateNodeTransition
implements SingleArcTransition {
@@ -726,7 +756,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
new ArrayList();
for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
ContainerId containerId = remoteContainer.getContainerId();
-
+
// Don't bother with containers already scheduled for cleanup, or for
// applications already killed. The scheduler doens't need to know any
// more about this container
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 115f0b4..877a122 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -491,7 +491,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception {
ApplicationAttemptId.newInstance(app.getApplicationId(), 2), 1),
ContainerState.COMPLETE, Resource.newInstance(1024, 1),
"Dummy Completed", 0, Priority.newInstance(10), 1234);
- rm.getResourceTrackerService().handleNMContainerStatus(report);
+ rm.getResourceTrackerService().handleNMContainerStatus(report, null);
verify(handler, never()).handle((Event) any());
// Case 1.2: Master container is null
@@ -502,7 +502,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception {
ContainerId.newInstance(currentAttempt.getAppAttemptId(), 0),
ContainerState.COMPLETE, Resource.newInstance(1024, 1),
"Dummy Completed", 0, Priority.newInstance(10), 1234);
- rm.getResourceTrackerService().handleNMContainerStatus(report);
+ rm.getResourceTrackerService().handleNMContainerStatus(report, null);
verify(handler, never()).handle((Event)any());
// Case 2: Managed AM
@@ -515,7 +515,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception {
ContainerState.COMPLETE, Resource.newInstance(1024, 1),
"Dummy Completed", 0, Priority.newInstance(10), 1234);
try {
- rm.getResourceTrackerService().handleNMContainerStatus(report);
+ rm.getResourceTrackerService().handleNMContainerStatus(report, null);
} catch (Exception e) {
// expected - ignore
}
@@ -530,7 +530,7 @@ public void testHandleContainerStatusInvalidCompletions() throws Exception {
ContainerState.COMPLETE, Resource.newInstance(1024, 1),
"Dummy Completed", 0, Priority.newInstance(10), 1234);
try {
- rm.getResourceTrackerService().handleNMContainerStatus(report);
+ rm.getResourceTrackerService().handleNMContainerStatus(report, null);
} catch (Exception e) {
// expected - ignore
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index b8e6f43..15028f9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -28,6 +28,7 @@
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -35,6 +36,7 @@
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -91,6 +93,11 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
+
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -151,6 +158,7 @@
private NMTokenSecretManagerInRM nmTokenManager =
spy(new NMTokenSecretManagerInRM(conf));
private boolean transferStateFromPreviousAttempt = false;
+ private EventHandler rmnodeEventHandler;
private final class TestApplicationAttemptEventDispatcher implements
EventHandler {
@@ -203,7 +211,7 @@ public void handle(AMLauncherEvent event) {
applicationMasterLauncher.handle(event);
}
}
-
+
private static int appId = 1;
private ApplicationSubmissionContext submissionContext = null;
@@ -268,6 +276,9 @@ public void setUp() throws Exception {
rmDispatcher.register(AMLauncherEventType.class,
new TestAMLauncherEventDispatcher());
+ rmnodeEventHandler = mock(RMNodeImpl.class);
+ rmDispatcher.register(RMNodeEventType.class, rmnodeEventHandler);
+
rmDispatcher.init(conf);
rmDispatcher.start();
@@ -575,6 +586,8 @@ private void testAppAttemptFinishedState(Container container,
}
assertEquals(finishedContainerCount, applicationAttempt
.getJustFinishedContainers().size());
+ Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
+ .size());
assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(finalStatus, applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
@@ -704,7 +717,8 @@ private void testUnmanagedAMSuccess(String url) {
application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(),
container.getNodeId()));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
+ applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class),
+ container.getNodeId()));
// complete AM
String diagnostics = "Successful";
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
@@ -752,10 +766,11 @@ public void testUsageReport() {
when(appResUsgRpt.getMemorySeconds()).thenReturn(223456L);
when(appResUsgRpt.getVcoreSeconds()).thenReturn(75544L);
sendAttemptUpdateSavedEvent(applicationAttempt);
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
attemptId,
ContainerStatus.newInstance(
- amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+ amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
when(scheduler.getSchedulerAppInfo(eq(attemptId))).thenReturn(null);
@@ -857,8 +872,9 @@ public void testAMCrashAtScheduled() {
SchedulerUtils.LOST_CONTAINER);
// send CONTAINER_FINISHED event at SCHEDULED state,
// The state should be FINAL_SAVING with previous state SCHEDULED
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- applicationAttempt.getAppAttemptId(), cs));
+ applicationAttempt.getAppAttemptId(), cs, anyNodeId));
// createApplicationAttemptState will return previous state (SCHEDULED),
// if the current state is FINAL_SAVING.
assertEquals(YarnApplicationAttemptState.SCHEDULED,
@@ -904,8 +920,9 @@ public void testAMCrashAtAllocated() {
ContainerStatus cs =
BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, containerDiagMsg, exitCode);
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- applicationAttempt.getAppAttemptId(), cs));
+ applicationAttempt.getAppAttemptId(), cs, anyNodeId));
assertEquals(YarnApplicationAttemptState.ALLOCATED,
applicationAttempt.createApplicationAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt);
@@ -928,16 +945,17 @@ public void testRunningToFailed() {
ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, containerDiagMsg, exitCode);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- appAttemptId, cs));
+ appAttemptId, cs, anyNodeId));
// ignored ContainerFinished and Expire at FinalSaving if we were supposed
// to Failed state.
assertEquals(RMAppAttemptState.FINAL_SAVING,
- applicationAttempt.getAppAttemptState());
+ applicationAttempt.getAppAttemptState());
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
- amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+ amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
applicationAttempt.handle(new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
assertEquals(RMAppAttemptState.FINAL_SAVING,
@@ -947,7 +965,7 @@ public void testRunningToFailed() {
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
- assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -971,10 +989,11 @@ public void testRunningToKilled() {
// ignored ContainerFinished and Expire at FinalSaving if we were supposed
// to Killed state.
assertEquals(RMAppAttemptState.FINAL_SAVING,
- applicationAttempt.getAppAttemptState());
+ applicationAttempt.getAppAttemptState());
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
- amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+ amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
applicationAttempt.handle(new RMAppAttemptEvent(
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
assertEquals(RMAppAttemptState.FINAL_SAVING,
@@ -984,7 +1003,7 @@ public void testRunningToKilled() {
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.KILLED,
applicationAttempt.getAppAttemptState());
- assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(1,applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@@ -1144,13 +1163,14 @@ public void testFinishingToFinishing() {
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
diagnostics);
// container must be AM container to move from FINISHING to FINISHED
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(
new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(),
BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(
applicationAttempt.getAppAttemptId(), 42),
- ContainerState.COMPLETE, "", 0)));
+ ContainerState.COMPLETE, "", 0), anyNodeId));
testAppAttemptFinishingState(amContainer, finalStatus, trackingUrl,
diagnostics);
}
@@ -1165,13 +1185,14 @@ public void testSuccessfulFinishingToFinished() {
String diagnostics = "Successful";
unregisterApplicationAttempt(amContainer, finalStatus, trackingUrl,
diagnostics);
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(
new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(),
BuilderUtils.newContainerStatus(amContainer.getId(),
- ContainerState.COMPLETE, "", 0)));
+ ContainerState.COMPLETE, "", 0), anyNodeId));
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
- diagnostics, 0, false);
+ diagnostics, 1, false);
}
// While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
@@ -1195,15 +1216,16 @@ public void testSuccessfulFinishingToFinished() {
assertEquals(YarnApplicationAttemptState.RUNNING,
applicationAttempt.createApplicationAttemptState());
// Container_finished event comes before Attempt_Saved event.
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), BuilderUtils.newContainerStatus(
- amContainer.getId(), ContainerState.COMPLETE, "", 0)));
+ amContainer.getId(), ContainerState.COMPLETE, "", 0), anyNodeId));
assertEquals(RMAppAttemptState.FINAL_SAVING,
applicationAttempt.getAppAttemptState());
// send attempt_saved
sendAttemptUpdateSavedEvent(applicationAttempt);
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
- diagnostics, 0, false);
+ diagnostics, 1, false);
}
// While attempt is at FINAL_SAVING, Expire event may come before
@@ -1235,6 +1257,71 @@ public void testFinalSavingToFinishedWithExpire() {
diagnostics, 0, false);
}
+ @Test
+ public void testFinishedContainer() {
+ Container amContainer = allocateApplicationAttempt();
+ launchApplicationAttempt(amContainer);
+ runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+
+ // Complete one container
+ ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
+ .getAppAttemptId(), 2);
+ Container container1 = mock(Container.class);
+ ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+ when(container1.getId()).thenReturn(
+ containerId1);
+ when(containerStatus1.getContainerId()).thenReturn(containerId1);
+ when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
+
+ application.handle(new RMAppRunningOnNodeEvent(application
+ .getApplicationId(),
+ container1.getNodeId()));
+ applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+ applicationAttempt.getAppAttemptId(), containerStatus1,
+ container1.getNodeId()));
+
+ ArgumentCaptor captor =
+ ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
+
+ // Verify justFinishedContainers
+ Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
+ .size());
+ Assert.assertEquals(container1.getId(), applicationAttempt
+ .getJustFinishedContainers().get(0).getContainerId());
+ Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
+ .size());
+
+ // Verify finishedContainersSentToAM gets container after pull
+ List containerStatuses = applicationAttempt
+ .pullJustFinishedContainers();
+ Assert.assertEquals(1, containerStatuses.size());
+ Mockito.verify(rmnodeEventHandler, never()).handle(Mockito
+ .any(RMNodeEvent.class));
+ Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty());
+ Assert.assertEquals(1, getFinishedContainersSentToAM(applicationAttempt)
+ .size());
+
+ // Verify container is acked to NM via the RMNodeEvent after second pull
+ containerStatuses = applicationAttempt.pullJustFinishedContainers();
+ Assert.assertEquals(0, containerStatuses.size());
+ Mockito.verify(rmnodeEventHandler).handle(captor.capture());
+ Assert.assertEquals(container1.getId(), captor.getValue().getContainers()
+ .get(0));
+ Assert.assertTrue(applicationAttempt.getJustFinishedContainers().isEmpty());
+ Assert.assertEquals(0, getFinishedContainersSentToAM(applicationAttempt)
+ .size());
+ }
+
+ private static List getFinishedContainersSentToAM(
+ RMAppAttempt applicationAttempt) {
+ List containers = new ArrayList();
+ for (List containerStatuses: applicationAttempt
+ .getFinishedContainersSentToAMReference().values()) {
+ containers.addAll(containerStatuses);
+ }
+ return containers;
+ }
+
// this is to test user can get client tokens only after the client token
// master key is saved in the state store and also registered in
// ClientTokenSecretManager
@@ -1281,8 +1368,9 @@ public void testFailedToFailed() {
ContainerStatus.newInstance(amContainer.getId(),
ContainerState.COMPLETE, "some error", 123);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- appAttemptId, cs1));
+ appAttemptId, cs1, anyNodeId));
assertEquals(YarnApplicationAttemptState.RUNNING,
applicationAttempt.createApplicationAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt);
@@ -1293,15 +1381,21 @@ public void testFailedToFailed() {
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
// failed attempt captured the container finished event.
- assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
+ assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
ContainerStatus cs2 =
ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
ContainerState.COMPLETE, "", 0);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- appAttemptId, cs2));
- assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
- assertEquals(cs2.getContainerId(), applicationAttempt
- .getJustFinishedContainers().get(0).getContainerId());
+ appAttemptId, cs2, anyNodeId));
+ assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
+ boolean found = false;
+ for (ContainerStatus containerStatus:applicationAttempt
+ .getJustFinishedContainers()) {
+ if (cs2.getContainerId().equals(containerStatus.getContainerId())) {
+ found = true;
+ }
+ }
+ assertTrue(found);
}
@@ -1322,8 +1416,9 @@ public void testContainersCleanupForLastAttempt() {
ContainerStatus.newInstance(amContainer.getId(),
ContainerState.COMPLETE, "some error", 123);
ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
+ NodeId anyNodeId = NodeId.newInstance("host", 1234);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
- appAttemptId, cs1));
+ appAttemptId, cs1, anyNodeId));
assertEquals(YarnApplicationAttemptState.RUNNING,
applicationAttempt.createApplicationAttemptState());
sendAttemptUpdateSavedEvent(applicationAttempt);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
index b3dc35f..a0d5d84 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java
@@ -161,7 +161,7 @@ public void testTokenExpiry() throws Exception {
.getEventHandler()
.handle(
new RMAppAttemptContainerFinishedEvent(applicationAttemptId,
- containerStatus));
+ containerStatus, nm1.getNodeId()));
// Make sure the RMAppAttempt is at Finished State.
// Both AMRMToken and ClientToAMToken have been removed.