commit 0ed5390e6cd699c63f6592dbf41e30f908d35931 Author: Jian He Date: Mon Jan 6 10:41:15 2014 -0800 YARN-1041 diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 9a3cb24..1f239ea 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -865,4 +865,9 @@ public ApplicationResourceUsageReport getAppResourceUsageReport( public RMContainer getRMContainer(ContainerId containerId) { return null; } + + @Override + public List getAllRunningContainers(ApplicationId applicationId) { + return null; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 9c817b3..74b8682 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -27,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -51,12 +53,14 @@ @Unstable public static RegisterApplicationMasterResponse newInstance( Resource minCapability, Resource maxCapability, - Map acls, ByteBuffer key) { + Map acls, ByteBuffer key, + List runningContainers) { RegisterApplicationMasterResponse response = Records.newRecord(RegisterApplicationMasterResponse.class); response.setMaximumResourceCapability(maxCapability); response.setApplicationACLs(acls); response.setClientToAMTokenMasterKey(key); + response.setRunningContainers(runningContainers); return response; } @@ -105,4 +109,12 @@ public static RegisterApplicationMasterResponse newInstance( @Public @Stable public abstract void setClientToAMTokenMasterKey(ByteBuffer key); + + @Public + @Stable + public abstract List getRunningContainers(); + + @Private + @Unstable + public abstract void setRunningContainers(List runningContainers); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index a4631d1..5206867 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -44,6 +44,7 @@ message RegisterApplicationMasterResponseProto { optional ResourceProto maximumCapability = 1; optional bytes client_to_am_token_master_key = 2; repeated ApplicationACLMapProto application_ACLs = 3; + repeated ContainerProto running_containers = 4; } message FinishApplicationMasterRequestProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index 486304c..214ae83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -29,11 +30,15 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder; @@ -52,6 +57,7 @@ private Resource maximumResourceCapability; private Map applicationACLS = null; + private List runningContainers = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -105,6 +111,9 @@ private void mergeLocalToBuilder() { if (this.applicationACLS != null) { addApplicationACLs(); } + if (this.runningContainers != null) { + addRunningContainersToProto(); + } } @@ -226,6 +235,43 @@ public ByteBuffer getClientToAMTokenMasterKey() { ByteBuffer.wrap(builder.getClientToAmTokenMasterKey().toByteArray()); return key; } + + @Override + public List getRunningContainers() { + if (this.runningContainers != null) { + return this.runningContainers; + } + initRunningContainersList(); + return this.runningContainers; + } + + @Override + public void setRunningContainers(final List containers) { + if (containers == null) { + return; + } + this.runningContainers = new ArrayList(); + this.runningContainers.addAll(containers); + } + + private void initRunningContainersList() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getRunningContainersList(); + runningContainers = new ArrayList(); + for (ContainerProto c : list) { + runningContainers.add(convertFromProtoFormat(c)); + } + } + + private void addRunningContainersToProto() { + maybeInitBuilder(); + builder.clearRunningContainers(); + List list = new ArrayList(); + for (Container c : runningContainers) { + list.add(convertToProtoFormat(c)); + } + builder.addAllRunningContainers(list); + } private Resource convertFromProtoFormat(ResourceProto resource) { return new ResourcePBImpl(resource); @@ -235,4 +281,11 @@ private ResourceProto convertToProtoFormat(Resource resource) { return ((ResourcePBImpl)resource).getProto(); } + private ContainerPBImpl convertFromProtoFormat(ContainerProto p) { + return new ContainerPBImpl(p); + } + + private ContainerProto convertToProtoFormat(Container t) { + return ((ContainerPBImpl) t).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 2fd1951..add648c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionContainer; @@ -271,6 +272,12 @@ public RegisterApplicationMasterResponse registerApplicationMaster( .getClientToAMTokenSecretManager() .getMasterKey(applicationAttemptId).getEncoded())); } + + if (!app.getApplicationSubmissionContext().getCleanContainersWhenFail()) { + List containerList = + rScheduler.getAllRunningContainers(appID); + response.setRunningContainers(containerList); + } return response; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 4f1cb74..b98b602 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -28,7 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -180,4 +182,11 @@ boolean checkAccess(UserGroupInformation callerUGI, @LimitedPrivate("yarn") @Unstable public RMContainer getRMContainer(ContainerId containerId); + + /** + * Get all the running containers for the given application. + */ + @LimitedPrivate("yarn") + @Unstable + public List getAllRunningContainers(ApplicationId applicationId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 82227ce..f6224a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -941,6 +942,28 @@ FiCaSchedulerNode getNode(NodeId nodeId) { } @Override + public List getAllRunningContainers(ApplicationId applicationId) { + SchedulerApplication app = applications.get(applicationId); + if (app == null) { + return null; + } + Collection liveContainers = + app.getCurrentAppAttempt().getLiveContainers(); + List containerList = + new ArrayList(liveContainers.size()); + ContainerId amContainerId = + this.rmContext.getRMApps().get(applicationId).getCurrentAppAttempt() + .getMasterContainer().getId(); + for (RMContainer rmContainer : liveContainers) { + if (rmContainer.getState().equals(RMContainerState.RUNNING) + && !rmContainer.getContainerId().equals(amContainerId)) { + containerList.add(rmContainer.getContainer()); + } + } + return containerList; + } + + @Override public RMContainer getRMContainer(ContainerId containerId) { FiCaSchedulerApp attempt = (FiCaSchedulerApp) getCurrentAttemptForContainer(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 4233eb9..ba52be6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -264,12 +264,35 @@ public QueueManager getQueueManager() { } @Override + public List getAllRunningContainers(ApplicationId applicationId) { + SchedulerApplication app = applications.get(applicationId); + if (app == null) { + return null; + } + Collection liveContainers = + app.getCurrentAppAttempt().getLiveContainers(); + List containerList = + new ArrayList(liveContainers.size()); + ContainerId amContainerId = + this.rmContext.getRMApps().get(applicationId).getCurrentAppAttempt() + .getMasterContainer().getId(); + for (RMContainer rmContainer : liveContainers) { + if (rmContainer.getState().equals(RMContainerState.RUNNING) + && !rmContainer.getContainerId().equals(amContainerId)) { + containerList.add(rmContainer.getContainer()); + } + } + return containerList; + } + + @Override public RMContainer getRMContainer(ContainerId containerId) { FSSchedulerApp attempt = (FSSchedulerApp) getCurrentAttemptForContainer(containerId); return (attempt == null) ? null : attempt.getRMContainer(containerId); } + private SchedulerApplicationAttempt getCurrentAttemptForContainer( ContainerId containerId) { SchedulerApplication app = @@ -1361,5 +1384,4 @@ public void onReload(AllocationConfiguration queueInfo) { queue.collectSchedulerApplications(apps); return apps; } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index b39cd3f..63609be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -912,6 +913,28 @@ public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) { } @Override + public List getAllRunningContainers(ApplicationId applicationId) { + SchedulerApplication app = applications.get(applicationId); + if (app == null) { + return null; + } + Collection liveContainers = + app.getCurrentAppAttempt().getLiveContainers(); + List containerList = + new ArrayList(liveContainers.size()); + ContainerId amContainerId = + this.rmContext.getRMApps().get(applicationId).getCurrentAppAttempt() + .getMasterContainer().getId(); + for (RMContainer rmContainer : liveContainers) { + if (rmContainer.getState().equals(RMContainerState.RUNNING) + && !rmContainer.getContainerId().equals(amContainerId)) { + containerList.add(rmContainer.getContainer()); + } + } + return containerList; + } + + @Override public RMContainer getRMContainer(ContainerId containerId) { FiCaSchedulerApp attempt = (FiCaSchedulerApp) getCurrentAttemptForContainer(containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 2c52f1a..1d8c860 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -24,6 +24,7 @@ import junit.framework.Assert; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; @@ -100,7 +101,7 @@ public void testAMRestartWithExistingContainers() throws Exception { ContainerId.newInstance(am1.getApplicationAttemptId(), 4); rm1.waitForState(nm1, containerId4, RMContainerState.ACQUIRED); - // fail the AM by sending CONTAINER_FINISHED event without registering. + // fail the AM by sending CONTAINER_FINISHED event without unregistering. nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am1.waitForState(RMAppAttemptState.FAILED); @@ -117,7 +118,28 @@ public void testAMRestartWithExistingContainers() throws Exception { ApplicationAttemptId newAttemptId = app1.getCurrentAppAttempt().getAppAttemptId(); Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId())); - MockAM am2 = TestRM.launchAM(app1, rm1, nm1); + + // launch the new AM + RMAppAttempt attempt2 = app1.getCurrentAppAttempt(); + nm1.nodeHeartbeat(true); + MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId()); + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + + // Assert two containers are running: container2 and container3; + Assert.assertEquals(2, registerResponse.getRunningContainers().size()); + boolean containerId2Exists = false, containerId3Exists = false; + for (Container container : registerResponse.getRunningContainers()) { + if (container.getId().equals(containerId2)) { + containerId2Exists = true; + } + if (container.getId().equals(containerId3)) { + containerId3Exists = true; + } + } + Assert.assertTrue(containerId2Exists && containerId3Exists); + + rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); // complete container by sending the complete event to earlier attempt. nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE);