diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 9727e91..09bd5c9 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -585,6 +585,16 @@ public void rampDownReduces(int rampDown) { if (response.getAMCommand() != null) { switch(response.getAMCommand()) { case AM_RESYNC: + if (rmWorkPreservingRestartEnabled) { + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + lastResponseID = 0; + + // Registering to allow RM to discover an active AM for this application + + register(); + break; + } case AM_SHUTDOWN: // This can happen if the RM has been restarted. If it is in that state, // this application must clean itself up. diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index a9b5ce5..c803ce4 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -56,7 +57,8 @@ private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); - private int lastResponseID; + protected int lastResponseID; + protected boolean rmWorkPreservingRestartEnabled; private Resource availableResources; private final RecordFactory recordFactory = @@ -131,6 +133,9 @@ public String toString() { @Override protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + rmWorkPreservingRestartEnabled = conf.getBoolean(MRJobConfig + .MR_RM_WORKPRESERVING_RESTART_ENABLED, + MRJobConfig.DEFAULT_MR_RM_WORKPRESERVING_RESTART_ENABLED); nodeBlacklistingEnabled = conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true); LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled); @@ -163,6 +168,13 @@ protected AllocateResponse makeRemoteRequest() throws IOException { } catch (YarnException e) { throw new IOException(e); } + + if (rmWorkPreservingRestartEnabled && isResyncCommand(allocateResponse)) { + LOG.warn("ApplicationMaster is out of sync with ResourceManager," + + " hence resyncing."); + return allocateResponse; + } + lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -190,6 +202,12 @@ protected AllocateResponse makeRemoteRequest() throws IOException { blacklistRemovals.clear(); return allocateResponse; } + + private boolean isResyncCommand(AllocateResponse allocateResponse) + { + return allocateResponse.getAMCommand() != null + && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; + } // May be incorrect if there's multiple NodeManagers running on a single host. // knownNodeCount is based on node managers, not hosts. blacklisting is diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 33d79ee..31cd2c5 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -533,6 +533,12 @@ MR_AM_PREFIX + "scheduler.connection.wait.interval-ms"; public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000; + + public static final String MR_RM_WORKPRESERVING_RESTART_ENABLED = + MR_AM_PREFIX + "rm.workpreservingrestart.enabled"; + public static final boolean DEFAULT_MR_RM_WORKPRESERVING_RESTART_ENABLED = + true; + /** * How long to wait in milliseconds for the output committer to cancel * an operation when the job is being killed diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java index 73e8085..f141c08 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java @@ -34,13 +34,21 @@ @Stable public abstract class ContainerId implements Comparable{ + // Prototype Placeholder so that all legacy code does not need to change to + // compile + public static ContainerId newInstance(ApplicationAttemptId appAttemptId, + int containerId) { + return newInstance(appAttemptId, containerId, System.currentTimeMillis()); + } + @Private @Unstable public static ContainerId newInstance(ApplicationAttemptId appAttemptId, - int containerId) { + int containerId, long clusterTimestamp) { ContainerId id = Records.newRecord(ContainerId.class); id.setId(containerId); id.setApplicationAttemptId(appAttemptId); + id.setClusterTimestamp(clusterTimestamp); id.build(); return id; } @@ -80,6 +88,14 @@ public static ContainerId newInstance(ApplicationAttemptId appAttemptId, @Private @Unstable protected abstract void setId(int id); + + @Private + @Unstable + public abstract long getClusterTimestamp(); + + @Private + @Unstable + protected abstract void setClusterTimestamp(long timestamp); // TODO: fail the app submission if attempts are more than 10 or something @@ -113,6 +129,7 @@ public int hashCode() { int result = 7507; result = prime * result + getId(); result = prime * result + getApplicationAttemptId().hashCode(); + result = (int) (prime * result + getClusterTimestamp()); return result; } @@ -129,6 +146,8 @@ public boolean equals(Object obj) { return false; if (this.getId() != other.getId()) return false; + if (this.getClusterTimestamp() != other.getClusterTimestamp()) + return false; return true; } @@ -136,7 +155,12 @@ public boolean equals(Object obj) { public int compareTo(ContainerId other) { if (this.getApplicationAttemptId().compareTo( other.getApplicationAttemptId()) == 0) { - return this.getId() - other.getId(); + if (this.getClusterTimestamp() == other.getClusterTimestamp()) { + return this.getId() - other.getId(); + } + else { + return (int) (this.getClusterTimestamp() - other.getClusterTimestamp()); + } } else { return this.getApplicationAttemptId().compareTo( other.getApplicationAttemptId()); @@ -155,6 +179,7 @@ public String toString() { sb.append( appAttemptIdFormat.get().format( getApplicationAttemptId().getAttemptId())).append("_"); + sb.append(getClusterTimestamp()).append("_"); sb.append(containerIdFormat.get().format(getId())); return sb.toString(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0a11948..0228e58 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -355,6 +355,10 @@ public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled"; public static final boolean DEFAULT_RM_HA_ENABLED = false; + public static final String RM_WORKPRESERVING_RESTART_ENABLED = RM_PREFIX + + "workpreservingrestart.enabled"; + public static final boolean DEFAULT_RM_WORKPRESERVING_RESTART_ENABLED = true; + public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 48aac9d..1f95a90 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -51,6 +51,7 @@ message ContainerIdProto { optional ApplicationIdProto app_id = 1; optional ApplicationAttemptIdProto app_attempt_id = 2; optional int32 id = 3; + optional int64 cluster_timestamp = 4; } message ResourceProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 498dbe3..c22839f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -60,7 +60,7 @@ public void testResourceTrackerOnHA() throws Exception { // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null); + YarnVersionInfo.getVersion(), null, null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java index 9be829f..b2eca63 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java @@ -59,6 +59,18 @@ protected void setId(int id) { builder.setId((id)); } + @Override + public long getClusterTimestamp() { + Preconditions.checkNotNull(proto); + return proto.getClusterTimestamp(); + } + + @Override + protected void setClusterTimestamp(long timestamp) { + Preconditions.checkNotNull(builder); + builder.setClusterTimestamp(timestamp); + } + @Override public ApplicationAttemptId getApplicationAttemptId() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index e2e04f3..bbba670 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -117,6 +117,7 @@ public void write(DataOutput out) throws IOException { out.writeInt(applicationId.getId()); out.writeInt(applicationAttemptId.getAttemptId()); out.writeInt(this.containerId.getId()); + out.writeLong(this.containerId.getClusterTimestamp()); out.writeUTF(this.nmHostAddr); out.writeUTF(this.appSubmitter); out.writeInt(this.resource.getMemory()); @@ -133,7 +134,7 @@ public void readFields(DataInput in) throws IOException { ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, in.readInt()); this.containerId = - ContainerId.newInstance(applicationAttemptId, in.readInt()); + ContainerId.newInstance(applicationAttemptId, in.readInt(), in.readLong()); this.nmHostAddr = in.readUTF(); this.appSubmitter = in.readUTF(); int memory = in.readInt(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java index f731af9..4e081ca 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java @@ -176,7 +176,8 @@ public static ContainerId toContainerId(String containerIdStr) { try { ApplicationAttemptId appAttemptID = toApplicationAttemptId(it); ContainerId containerId = - ContainerId.newInstance(appAttemptID, Integer.parseInt(it.next())); + ContainerId.newInstance(appAttemptID, Integer.parseInt(it.next()), + Long.parseLong(it.next())); return containerId; } catch (NumberFormatException n) { throw new IllegalArgumentException("Invalid ContainerId: " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 6ca3861..957b37d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -29,7 +30,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, - List containerStatuses) { + List containerStatuses, + List containerReports) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -37,18 +39,21 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNodeId(nodeId); request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); + request.setContainerReports(containerReports); return request; } - + public abstract NodeId getNodeId(); public abstract int getHttpPort(); public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getContainerStatuses(); + public abstract List getContainerReports(); public abstract void setNodeId(NodeId nodeId); public abstract void setHttpPort(int port); public abstract void setResource(Resource resource); public abstract void setNMVersion(String version); public abstract void setContainerStatuses(List containerStatuses); + public abstract void setContainerReports(List containerReports); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 6f9c43d..f248591 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -23,17 +23,22 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; @@ -50,7 +55,8 @@ private Resource resource = null; private NodeId nodeId = null; private List containerStatuses = null; - + private List containerReports = null; + public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); } @@ -71,6 +77,9 @@ private void mergeLocalToBuilder() { if (this.containerStatuses != null) { addContainerStatusesToProto(); } + if (this.containerReports != null) { + addContainerReportsToProto(); + } if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); } @@ -158,7 +167,13 @@ public void setHttpPort(int httpPort) { initContainerStatuses(); return containerStatuses; } - + + @Override + public List getContainerReports() { + initContainerReports(); + return containerReports; + } + private void initContainerStatuses() { if (this.containerStatuses != null) { return; @@ -171,6 +186,18 @@ private void initContainerStatuses() { } } + private void initContainerReports() { + if (this.containerReports != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainerReportsList(); + this.containerReports = new ArrayList(); + for (YarnProtos.ContainerReportProto c : list) { + this.containerReports.add(convertFromProtoFormat(c)); + } + } + @Override public void setContainerStatuses(List containers) { if (containers == null) { @@ -179,7 +206,16 @@ public void setContainerStatuses(List containers) { initContainerStatuses(); this.containerStatuses.addAll(containers); } - + + @Override + public void setContainerReports(List containerReports) { + if (containerReports == null) { + return; + } + initContainerReports(); + this.containerReports.addAll(containerReports); + } + private void addContainerStatusesToProto() { maybeInitBuilder(); builder.clearContainerStatuses(); @@ -212,6 +248,39 @@ public void remove() { }; builder.addAllContainerStatuses(it); } + + private void addContainerReportsToProto() { + maybeInitBuilder(); + builder.clearContainerReports(); + if (containerReports == null) { + return; + } + Iterable it = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containerReports.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerReportProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainerReports(it); + } @Override public int hashCode() { @@ -262,8 +331,16 @@ private ResourceProto convertToProtoFormat(Resource t) { private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) { return new ContainerStatusPBImpl(c); } - + + private ContainerReportPBImpl convertFromProtoFormat(ContainerReportProto c) { + return new ContainerReportPBImpl(c); + } + private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { return ((ContainerStatusPBImpl)c).getProto(); } + + private ContainerReportProto convertToProtoFormat(ContainerReport c) { + return ((ContainerReportPBImpl)c).getProto(); + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index ac25c00..d9e5f2c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -142,6 +143,12 @@ public static ContainerId newContainerId(ApplicationAttemptId appAttemptId, return ContainerId.newInstance(appAttemptId, containerId); } + public static ContainerId newContainerId(ApplicationAttemptId appAttemptId, + int containerId, + long clusterTimestamp) { + return ContainerId.newInstance(appAttemptId, containerId, clusterTimestamp); + } + public static ContainerId newContainerId(int appId, int appAttemptId, long timestamp, int id) { ApplicationId applicationId = newApplicationId(timestamp, appId); @@ -199,6 +206,20 @@ public static ContainerStatus newContainerStatus(ContainerId containerId, return containerStatus; } + // TODO Define a better class that adds properties we care about for work + // preserving RM restart node update and not other properties that we don't + // care about + public static ContainerReport newContainerReport(ContainerId containerId, + ContainerState containerState, + Resource resource) { + ContainerReport containerReport = recordFactory + .newRecordInstance(ContainerReport.class); + containerReport.setAllocatedResource(resource); + containerReport.setContainerId(containerId); + containerReport.setContainerState(containerState); + return containerReport; + } + public static Container newContainer(ContainerId containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority, Token containerToken) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c544905..d1bb4f1 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto { optional ResourceProto resource = 4; optional string nm_version = 5; repeated ContainerStatusProto containerStatuses = 6; + repeated ContainerReportProto containerReports = 7; } message RegisterNodeManagerResponseProto { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 57ff127..a4a2709 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -80,7 +80,8 @@ private static CompositeServiceShutdownHook nodeManagerShutdownHook; private AtomicBoolean isStopping = new AtomicBoolean(false); - + private boolean rmWorkPreservingRestartEnbaled; + public NodeManager() { super(NodeManager.class.getName()); } @@ -130,6 +131,10 @@ protected void serviceInit(Configuration conf) throws Exception { conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + rmWorkPreservingRestartEnbaled = conf.getBoolean(YarnConfiguration + .RM_WORKPRESERVING_RESTART_ENABLED, + YarnConfiguration.DEFAULT_RM_WORKPRESERVING_RESTART_ENABLED); + boolean recoveryEnabled = conf.getBoolean( YarnConfiguration.NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED); @@ -245,7 +250,10 @@ public void run() { LOG.info("Notifying ContainerManager to block new container-requests"); containerManager.setBlockNewContainerRequests(true); LOG.info("Cleaning up running containers on resync"); - containerManager.cleanupContainersOnNMResync(); + if (!rmWorkPreservingRestartEnbaled) + { + containerManager.cleanupContainersOnNMResync(); + } ((NodeStatusUpdaterImpl) nodeStatusUpdater) .rebootNodeStatusUpdaterAndRegisterWithRM(); } catch (YarnRuntimeException e) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 4db000c..5bb16bf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -247,13 +248,18 @@ protected ResourceTracker getRMClient() throws IOException { protected void registerWithRM() throws YarnException, IOException { List containerStatuses = getContainerStatuses(); + List containerReports = getContainerReports(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerStatuses); - if (containerStatuses != null) { + nodeManagerVersionId, containerStatuses, containerReports); + if (containerStatuses != null && !containerStatuses.isEmpty()) { LOG.info("Registering with RM using finished containers :" + containerStatuses); } + if (containerReports != null && !containerReports.isEmpty()) { + LOG.info("Registering with RM using container reports :" + + containerReports); + } RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request); this.rmIdentifier = regNMResponse.getRMIdentifier(); @@ -375,6 +381,22 @@ private NodeStatus getNodeStatus(int responseId) { return containerStatuses; } + // Iterate through the NMContext and clone and get all the containers' + // report. + @VisibleForTesting + protected List getContainerReports() { + List containerReports = new ArrayList(); + for (Container container : this.context.getContainers().values()) { + org.apache.hadoop.yarn.api.records.ContainerReport containerReport = + container.cloneAndGetContainerReport(); + containerReports.add(containerReport); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sending out container report: " + containerReports); + } + return containerReports; + } + private void addCompletedContainer(Container container) { synchronized (previousCompletedContainers) { previousCompletedContainers.add(container.getContainerId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index e69e61a..0c9bf7d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -25,6 +25,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; @@ -50,6 +51,8 @@ ContainerStatus cloneAndGetContainerStatus(); + ContainerReport cloneAndGetContainerReport(); + String toString(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 50653f5..fa5dc95 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -388,6 +389,17 @@ public ContainerStatus cloneAndGetContainerStatus() { } @Override + public ContainerReport cloneAndGetContainerReport() { + this.readLock.lock(); + try { + return BuilderUtils.newContainerReport(this.containerId, + getCurrentState(), getResource()); + } finally { + this.readLock.unlock(); + } + } + + @Override public ContainerId getContainerId() { return this.containerId; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index a021214..18d622e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Dispatcher; @@ -112,6 +113,17 @@ public ContainerStatus cloneAndGetContainerStatus() { } @Override + public ContainerReport cloneAndGetContainerReport() { + ContainerReport containerReport = recordFactory + .newRecordInstance(ContainerReport.class); + containerReport.setContainerState( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + containerReport.setDiagnosticsInfo("testing"); + containerReport.setContainerExitStatus(0); + return containerReport; + } + + @Override public String toString() { return ""; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index eecf039..4b7ded5 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -203,7 +204,12 @@ public boolean isPmemCheckEnabled() { @Override public ContainerState getContainerState() { return ContainerState.RUNNING; - }; + } + + @Override + public ContainerReport cloneAndGetContainerReport() { + return null; + } }; nmContext.getContainers().put(containerId, container); //TODO: Gross hack. Fix in code. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 6b2cb7f..c01c937 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -394,6 +394,17 @@ public boolean hasApplicationMasterRegistered( return hasApplicationMasterRegistered; } + /** + * + * @param appAttemptId + * @return true if application is known either as registered (responseId >=0) + * or unregistered (responseId == -1) + */ + public boolean isApplicationAttemptKnown( + ApplicationAttemptId appAttemptId) { + return responseMap.containsKey(appAttemptId); + } + @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { @@ -411,6 +422,16 @@ public AllocateResponse allocate(AllocateRequest request) synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { + if (this.rmContext.isWorkPreservingRestartEnabled() + && isApplicationAttemptKnown(appAttemptId)) { + String message = + "Application Master is not registered for known application: " + + appAttemptId.getApplicationId() + + ". Looks like RM rebooted. Let AM resync."; + LOG.info(message); + return resync; + } + String message = "Application Master is trying to allocate before registering for: " + appAttemptId.getApplicationId(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 79fb5df..4f32034 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -48,6 +48,8 @@ boolean isHAEnabled(); + boolean isWorkPreservingRestartEnabled(); + HAServiceState getHAServiceState(); RMStateStore getStateStore(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1eb4b75..7088a68 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -81,6 +81,7 @@ private ApplicationMasterService applicationMasterService; private RMApplicationHistoryWriter rmApplicationHistoryWriter; private ConfigurationProvider configurationProvider; + private boolean isWorkPreservingRestartEnabled; /** * Default constructor. To be used in conjunction with setter methods for @@ -231,6 +232,10 @@ void setHAEnabled(boolean isHAEnabled) { this.isHAEnabled = isHAEnabled; } + void setWorkPreservingRestartEnabled(boolean isWorkPreservingRestartEnabled) { + this.isWorkPreservingRestartEnabled = isWorkPreservingRestartEnabled; + } + void setHAServiceState(HAServiceState haServiceState) { synchronized (haServiceState) { this.haServiceState = haServiceState; @@ -323,6 +328,11 @@ public boolean isHAEnabled() { } @Override + public boolean isWorkPreservingRestartEnabled() { return + isWorkPreservingRestartEnabled; + } + + @Override public HAServiceState getHAServiceState() { synchronized (haServiceState) { return haServiceState; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 054ec04..d896144 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -219,6 +219,10 @@ protected void serviceInit(Configuration conf) throws Exception { addService(adminService); rmContext.setRMAdminService(adminService); + this.rmContext.setWorkPreservingRestartEnabled(conf.getBoolean + (YarnConfiguration.RM_WORKPRESERVING_RESTART_ENABLED, + YarnConfiguration.DEFAULT_RM_WORKPRESERVING_RESTART_ENABLED)); + this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); if (this.rmContext.isHAEnabled()) { HAUtil.verifyAndSetConfiguration(this.conf); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 1d40320..8a66226 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -217,7 +218,13 @@ void handleContainerStatus(ContainerStatus containerStatus) { } RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); - Container masterContainer = rmAppAttempt.getMasterContainer(); + if (rmAppAttempt == null) { + LOG.debug("Ignoring container completion status for unknown AM" + + rmApp.getApplicationId()); + return; + } + Container masterContainer = rmAppAttempt + .getMasterContainer(); if (masterContainer.getId().equals(containerStatus.getContainerId()) && containerStatus.getState() == ContainerState.COMPLETE) { // sending master container finished event. @@ -305,7 +312,8 @@ public RegisterNodeManagerResponse registerNodeManager( RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.STARTED)); + new RMNodeStartedEvent(nodeId, request.getContainerStatuses(), + request.getContainerReports())); } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index f4f2e20..1c95c55 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; @@ -763,6 +764,13 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return RMAppState.SUBMITTED; } + if (app.rmContext.isWorkPreservingRestartEnabled()) { + // Let scheduler know about this attempt so it can allow AM to register + boolean disableTransferState = false; + app.handler.handle(new AppAttemptAddedSchedulerEvent(app.currentAttempt + .getAppAttemptId(), disableTransferState)); + } + // YARN-1507 is saving the application state after the application is // accepted. So after YARN-1507, an app is saved meaning it is accepted. // Thus we return ACCECPTED state on recovery. diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index efe0721..1f8e0cf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -285,6 +285,9 @@ RMAppAttemptEventType.KILL, new FinalSavingTransition(new FinalTransition( RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) + .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.LAUNCHED, + RMAppAttemptEventType.ATTEMPT_ADDED, + new RecoveredAppAttemptAddedTransition()) // Transitions from RUNNING State .addTransition(RMAppAttemptState.RUNNING, @@ -313,6 +316,9 @@ RMAppAttemptEventType.KILL, new FinalSavingTransition(new FinalTransition( RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) + .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, + RMAppAttemptEventType.ATTEMPT_ADDED, + new RecoveredAppAttemptAddedTransition()) // Transitions from FINAL_SAVING State .addTransition(RMAppAttemptState.FINAL_SAVING, @@ -920,11 +926,31 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, * heart beat back). */ (new AMLaunchedTransition()).transition(appAttempt, event); + + if (appAttempt.rmContext.isWorkPreservingRestartEnabled()) { + // Need to register an app attempt before AM can register + appAttempt.masterService + .registerAppAttempt(appAttempt.applicationAttemptId); + } + return RMAppAttemptState.LAUNCHED; } } } + private static class RecoveredAppAttemptAddedTransition + implements SingleArcTransition { + @Override + public void transition(RMAppAttemptImpl rmAppAttempt, RMAppAttemptEvent + rmAppAttemptEvent) { + // TODO we should validated that this is a recovered application + LOG.info("Nothing to do for adding an application " + + rmAppAttempt.getAppAttemptId() + + " that's recovered and in current state " + + rmAppAttempt.getAppAttemptState()); + } + } + private void rememberTargetTransitions(RMAppAttemptEvent event, Object transitionToDo, RMAppAttemptState targetFinalState) { transitionTodo = transitionToDo; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 2921891..2bca120 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -106,6 +106,10 @@ RMContainerEventType.RELEASED, new KillTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, RMContainerEventType.EXPIRE) + // Do nothing for containers that have LAUNCH event after being recovered + // as RUNNING + .addTransition(RMContainerState.RUNNING, RMContainerState.RUNNING, + RMContainerEventType.LAUNCHED) // Transitions from COMPLETED state .addTransition(RMContainerState.COMPLETED, RMContainerState.COMPLETED, diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index dc53a5d..6eaec47 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -329,7 +329,7 @@ public NodeState getState() { } } - + @Override public List getContainersToCleanUp() { @@ -461,8 +461,18 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler + RMNodeStartedEvent nodeStartedEvent = (RMNodeStartedEvent) event; + if (rmNode.context.isWorkPreservingRestartEnabled()) { + // Reprocess container updates + processContainerUpdates(rmNode, nodeStartedEvent + .getContainers()); + } + + // Send containerReports as we need to process resource capability and + // other information which is not part of the ContainerStatus rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); + new NodeAddedSchedulerEvent(rmNode, nodeStartedEvent + .getContainerReports())); rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( NodesListManagerEventType.NODE_USABLE, rmNode)); @@ -485,8 +495,11 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { - // Kill containers since node is rejoining. - rmNode.nodeUpdateQueue.clear(); + // Unless work preserving restart is enabled, kill containers since node + // is rejoining. + if (!rmNode.context.isWorkPreservingRestartEnabled()) { + rmNode.nodeUpdateQueue.clear(); + } rmNode.context.getDispatcher().getEventHandler().handle( new NodeRemovedSchedulerEvent(rmNode)); @@ -512,7 +525,8 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED)); + new RMNodeStartedEvent(newNode.getNodeID(), + new ArrayList())); } rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( @@ -606,49 +620,8 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { return NodeState.UNHEALTHY; } - // Filter the map to only obtain just launched containers and finished - // containers. - List newlyLaunchedContainers = - new ArrayList(); - List completedContainers = - new ArrayList(); - for (ContainerStatus remoteContainer : statusEvent.getContainers()) { - ContainerId containerId = remoteContainer.getContainerId(); - - // Don't bother with containers already scheduled for cleanup, or for - // applications already killed. The scheduler doens't need to know any - // more about this container - if (rmNode.containersToClean.contains(containerId)) { - LOG.info("Container " + containerId + " already scheduled for " + - "cleanup, no further processing"); - continue; - } - if (rmNode.finishedApplications.contains(containerId - .getApplicationAttemptId().getApplicationId())) { - LOG.info("Container " + containerId - + " belongs to an application that is already killed," - + " no further processing"); - continue; - } + processContainerUpdates(rmNode, statusEvent.getContainers()); - // Process running containers - if (remoteContainer.getState() == ContainerState.RUNNING) { - if (!rmNode.justLaunchedContainers.containsKey(containerId)) { - // Just launched container. RM knows about it the first time. - rmNode.justLaunchedContainers.put(containerId, remoteContainer); - newlyLaunchedContainers.add(remoteContainer); - } - } else { - // A finished container - rmNode.justLaunchedContainers.remove(containerId); - completedContainers.add(remoteContainer); - } - } - if(newlyLaunchedContainers.size() != 0 - || completedContainers.size() != 0) { - rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo - (newlyLaunchedContainers, completedContainers)); - } if(rmNode.nextHeartBeat) { rmNode.nextHeartBeat = false; rmNode.context.getDispatcher().getEventHandler().handle( @@ -666,6 +639,53 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { } } + private static void processContainerUpdates(RMNodeImpl rmNode, List + containerStatuses) { + // Filter the map to only obtain just launched containers and finished + // containers. + List newlyLaunchedContainers = + new ArrayList(); + List completedContainers = + new ArrayList(); + for (ContainerStatus remoteContainer : containerStatuses) { + ContainerId containerId = remoteContainer.getContainerId(); + + // Don't bother with containers already scheduled for cleanup, or for + // applications already killed. The scheduler doens't need to know any + // more about this container + if (rmNode.containersToClean.contains(containerId)) { + LOG.info("Container " + containerId + " already scheduled for " + + "cleanup, no further processing"); + continue; + } + if (rmNode.finishedApplications.contains(containerId + .getApplicationAttemptId().getApplicationId())) { + LOG.info("Container " + containerId + + " belongs to an application that is already killed," + + " no further processing"); + continue; + } + + // Process running containers + if (remoteContainer.getState() == ContainerState.RUNNING) { + if (!rmNode.justLaunchedContainers.containsKey(containerId)) { + // Just launched container. RM knows about it the first time. + rmNode.justLaunchedContainers.put(containerId, remoteContainer); + newlyLaunchedContainers.add(remoteContainer); + } + } else { + // A finished container + rmNode.justLaunchedContainers.remove(containerId); + completedContainers.add(remoteContainer); + } + } + if(newlyLaunchedContainers.size() != 0 + || completedContainers.size() != 0) { + rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo + (newlyLaunchedContainers, completedContainers)); + } + } + public static class StatusUpdateWhenUnHealthyTransition implements MultipleArcTransition { @@ -680,6 +700,8 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { rmNode.setLastHealthReportTime( remoteNodeHealthStatus.getLastHealthReportTime()); if (remoteNodeHealthStatus.getIsNodeHealthy()) { + // TODO For work preserving RM restart do we need to send all existing + // container updates in case RM restarted during the same time rmNode.context.getDispatcher().getEventHandler().handle( new NodeAddedSchedulerEvent(rmNode)); rmNode.context.getDispatcher().getEventHandler().handle( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java new file mode 100644 index 0000000..914b26c --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; + +public class RMNodeStartedEvent extends RMNodeEvent { + + private final List containersCollection; + private final List containerReports; + + public RMNodeStartedEvent(NodeId nodeId, + List collection) { + this(nodeId, collection, new ArrayList()); + } + + public RMNodeStartedEvent(NodeId nodeId, + List containerStatuses, + List containerReports) { + super(nodeId, RMNodeEventType.STARTED); + this.containersCollection = containerStatuses; + this.containerReports = containerReports; + } + + public List getContainers() { + return this.containersCollection; + } + + public List getContainerReports() { + return this.containerReports; + } +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index de71f71..8f9b1df 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -244,15 +244,20 @@ public synchronized boolean isBlacklisted(String resourceName) { * the request * @param container * the containers allocated. + * @param trackRequests + * should we track requests (false during RM restart) */ synchronized public void allocate(NodeType type, SchedulerNode node, - Priority priority, ResourceRequest request, Container container) { - if (type == NodeType.NODE_LOCAL) { - allocateNodeLocal(node, priority, request, container); - } else if (type == NodeType.RACK_LOCAL) { - allocateRackLocal(node, priority, request, container); - } else { - allocateOffSwitch(node, priority, request, container); + Priority priority, ResourceRequest request, Container container, + boolean trackRequests) { + if (trackRequests) { + if (type == NodeType.NODE_LOCAL) { + allocateNodeLocal(node, priority, request, container); + } else if (type == NodeType.RACK_LOCAL) { + allocateRackLocal(node, priority, request, container); + } else { + allocateOffSwitch(node, priority, request, container); + } } QueueMetrics metrics = queue.getMetrics(); if (pending) { @@ -269,7 +274,11 @@ synchronized public void allocate(NodeType type, SchedulerNode node, + " user=" + user + " resource=" + request.getCapability()); } - metrics.allocateResources(user, 1, request.getCapability(), true); + + // If we are in a restart state we are not trackingRequests, hence no + // need to (and cannot) decrement pending + boolean decrementPending = trackRequests; + metrics.allocateResources(user, 1, request.getCapability(), decrementPending); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index fc7e047..0a97d38 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -399,6 +399,23 @@ public ContainersAndNMTokensAllocation(List containerList, return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens); } + // During work preserving restart, container token and NMToken need to be + // retrieved + public synchronized void + setExistingContainersAndNMTokens() { + for (Iterator i = newlyAllocatedContainers.iterator(); i + .hasNext();) { + RMContainer rmContainer = i.next(); + Container container = rmContainer.getContainer(); + + // TODO get existing container token and NMToken. + // For now this is a placeholder for ensuring we perform ACQUIRED + i.remove(); + rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), + RMContainerEventType.ACQUIRED)); + } + } + public synchronized void updateBlacklist( List blacklistAdditions, List blacklistRemovals) { if (!isStopped) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 470cb10..c74422b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -129,7 +129,7 @@ synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node, liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - appSchedulingInfo.allocate(type, node, priority, request, container); + appSchedulingInfo.allocate(type, node, priority, request, container, true); Resources.addTo(currentConsumption, container.getResource()); // Inform the container diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java index c487f48..2836982 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java @@ -18,19 +18,34 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import java.util.ArrayList; +import java.util.List; + public class NodeAddedSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; + private final List containerReports; + + public NodeAddedSchedulerEvent(RMNode rmNode) + { + this(rmNode, new ArrayList()); + } - public NodeAddedSchedulerEvent(RMNode rmNode) { + public NodeAddedSchedulerEvent(RMNode rmNode, List + containerReports) { super(SchedulerEventType.NODE_ADDED); this.rmNode = rmNode; + this.containerReports = containerReports; + } + + public List getContainerReports() { + return containerReports; } public RMNode getAddedRMNode() { return rmNode; } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 9ed5179..c8760c6 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -155,7 +155,8 @@ public Container createContainer( NodeId nodeId = node.getRMNode().getNodeID(); ContainerId containerId = BuilderUtils.newContainerId(application - .getApplicationAttemptId(), application.getNewContainerId()); + .getApplicationAttemptId(), application.getNewContainerId(), + System.currentTimeMillis()); // Create the container Container container = diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index adabfef..f752544 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -246,9 +246,19 @@ public synchronized NodeType getAllowedLocalityLevelByTime(Priority priority, return allowedLocalityLevel.get(priority); } + synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, + Priority priority, ResourceRequest request, + Container container) { + return allocate(type, node, priority, request, container, true); + } + + // Common method used in normal allocate (trackRequests = true) and on + // resync after an RM restart (trackRequests = false). On resync we have + // no prior knowledge of requests and we cannot process them, + // which is ok as the AM will resend its pending requests synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, - Priority priority, ResourceRequest request, - Container container) { + Priority priority, ResourceRequest request, Container container, + boolean trackRequests) { // Update allowed locality level NodeType allowed = allowedLocalityLevel.get(priority); if (allowed != null) { @@ -265,8 +275,11 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && // Required sanity check - AM can call 'allocate' to update resource // request without locking the scheduler, hence we need to check - if (getTotalRequiredResources(priority) <= 0) { - return null; + + if (trackRequests) { + if (getTotalRequiredResources(priority) <= 0) { + return null; + } } // Create RMContainer @@ -279,7 +292,7 @@ else if (allowed.equals(NodeType.RACK_LOCAL) && liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations - appSchedulingInfo.allocate(type, node, priority, request, container); + appSchedulingInfo.allocate(type, node, priority, request, container, trackRequests); Resources.addTo(currentConsumption, container.getResource()); // Inform the container diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index fab9ebe..d23da54 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -71,10 +73,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -91,6 +95,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; + +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -825,13 +831,65 @@ private synchronized void completedContainer(RMContainer rmContainer, + " with event: " + event); } - private synchronized void addNode(RMNode node) { - nodes.put(node.getNodeID(), new FSSchedulerNode(node, usePortForNodeName)); + private synchronized void addNode(RMNode node, List + containerReports) { + FSSchedulerNode schedulerNode = + new FSSchedulerNode(node, usePortForNodeName); + nodes.put(node.getNodeID(), schedulerNode); Resources.addTo(clusterCapacity, node.getTotalCapability()); updateRootQueueMetrics(); LOG.info("Added node " + node.getNodeAddress() + " cluster capacity: " + clusterCapacity); + + if (rmContext.isWorkPreservingRestartEnabled()) { + HashSet appsToAcquireContainersFor = new HashSet(); + + // Set the containers that are already running to its app first as ALLOCATED + for (ContainerReport alreadyLaunchedContainer : containerReports) { + ContainerId containerId = alreadyLaunchedContainer.getContainerId(); + FSSchedulerApp app = getSchedulerApp(containerId + .getApplicationAttemptId()); + + appsToAcquireContainersFor.add(containerId.getApplicationAttemptId()); + + // TODO Do we need a valid NodeType & token if we are not tracking requests + NodeType dummyType = NodeType.NODE_LOCAL; + Priority priority = alreadyLaunchedContainer.getPriority(); + Resource capability = alreadyLaunchedContainer.getAllocatedResource(); + ResourceRequest resourceRequest = ResourceRequest.newInstance + (priority, node.getHostName(), capability, 1); + Token nullContainerToken = null; + Container recreatedContainer = BuilderUtils.newContainer(containerId, + schedulerNode.getNodeID(), schedulerNode.getHttpAddress(), + resourceRequest.getCapability(), resourceRequest.getPriority(), + nullContainerToken); + + // Assigning the container + RMContainer allocatedContainer = app.allocate(dummyType, schedulerNode, + priority, + resourceRequest, + recreatedContainer, + false); + + schedulerNode.allocateContainer(app.getApplicationId(), + allocatedContainer); + } + + // Now redo ACQUIRED for the container to set it as ACQUIRED + for (ApplicationAttemptId applicationAttemptId : + appsToAcquireContainersFor) { + FSSchedulerApp app = getSchedulerApp(applicationAttemptId); + app.setExistingContainersAndNMTokens(); + } + + // Reprocessing the already launched containers's LAUNCHED event to mark + // them as RUNNING + for (ContainerReport alreadyLaunchedContainer : containerReports) { + containerLaunchedOnNode(alreadyLaunchedContainer.getContainerId(), + schedulerNode); + } + } } private synchronized void removeNode(RMNode rmNode) { @@ -1155,7 +1213,8 @@ public void handle(SchedulerEvent event) { throw new RuntimeException("Unexpected event type: " + event); } NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; - addNode(nodeAddedEvent.getAddedRMNode()); + addNode(nodeAddedEvent.getAddedRMNode(), + nodeAddedEvent.getContainerReports()); break; case NODE_REMOVED: if (!(event instanceof NodeRemovedSchedulerEvent)) { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index caee228..fa1b3fc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedAction; +import java.util.ArrayList; import java.util.Map; import org.junit.Assert; @@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; @@ -69,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -333,7 +336,8 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(nm.getNodeId(), new ArrayList + ())); } public void sendNodeLost(MockNM nm) throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 3c453eb..cba50b2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -49,6 +49,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; + +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -160,7 +162,8 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent() { @Test (timeout = 5000) public void testExpiredContainer() { // Start the node - node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(null, new ArrayList + ())); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -188,11 +191,13 @@ public void testExpiredContainer() { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(null, new ArrayList + ())); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node2.handle(new RMNodeStartedEvent(null,new ArrayList + ())); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -248,7 +253,8 @@ public void testContainerUpdate() throws InterruptedException{ @Test (timeout = 5000) public void testStatusChange(){ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(null,new ArrayList + ())); //Add info to the queue first node.setNextHeartBeat(false); @@ -460,7 +466,8 @@ private RMNodeImpl getRunningNode() { RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), null); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(node.getNodeID(), new ArrayList + ())); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -491,7 +498,8 @@ public void testAdd() { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartedEvent(node.getNodeID(), new ArrayList + ())); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index 93fd300..ca2029c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -77,7 +77,7 @@ public void testMove() { app.liveContainers.put(container1.getContainerId(), container1); SchedulerNode node = createNode(); app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, requestedPriority, - request, container1.getContainer()); + request, container1.getContainer(), true); // Reserved container Priority prio1 = Priority.newInstance(1);