diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java index 73e8085..f141c08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ContainerId.java @@ -34,13 +34,21 @@ @Stable public abstract class ContainerId implements Comparable{ + // Prototype Placeholder so that all legacy code does not need to change to + // compile + public static ContainerId newInstance(ApplicationAttemptId appAttemptId, + int containerId) { + return newInstance(appAttemptId, containerId, System.currentTimeMillis()); + } + @Private @Unstable public static ContainerId newInstance(ApplicationAttemptId appAttemptId, - int containerId) { + int containerId, long clusterTimestamp) { ContainerId id = Records.newRecord(ContainerId.class); id.setId(containerId); id.setApplicationAttemptId(appAttemptId); + id.setClusterTimestamp(clusterTimestamp); id.build(); return id; } @@ -80,6 +88,14 @@ public static ContainerId newInstance(ApplicationAttemptId appAttemptId, @Private @Unstable protected abstract void setId(int id); + + @Private + @Unstable + public abstract long getClusterTimestamp(); + + @Private + @Unstable + protected abstract void setClusterTimestamp(long timestamp); // TODO: fail the app submission if attempts are more than 10 or something @@ -113,6 +129,7 @@ public int hashCode() { int result = 7507; result = prime * result + getId(); result = prime * result + getApplicationAttemptId().hashCode(); + result = (int) (prime * result + getClusterTimestamp()); return result; } @@ -129,6 +146,8 @@ public boolean equals(Object obj) { return false; if (this.getId() != other.getId()) return false; + if (this.getClusterTimestamp() != other.getClusterTimestamp()) + return false; return true; } @@ -136,7 +155,12 @@ public boolean equals(Object obj) { public int compareTo(ContainerId other) { if (this.getApplicationAttemptId().compareTo( other.getApplicationAttemptId()) == 0) { - return this.getId() - other.getId(); + if (this.getClusterTimestamp() == other.getClusterTimestamp()) { + return this.getId() - other.getId(); + } + else { + return (int) (this.getClusterTimestamp() - other.getClusterTimestamp()); + } } else { return this.getApplicationAttemptId().compareTo( other.getApplicationAttemptId()); @@ -155,6 +179,7 @@ public String toString() { sb.append( appAttemptIdFormat.get().format( getApplicationAttemptId().getAttemptId())).append("_"); + sb.append(getClusterTimestamp()).append("_"); sb.append(containerIdFormat.get().format(getId())); return sb.toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 0a11948..36d51e5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -318,6 +318,11 @@ public static final String RECOVERY_ENABLED = RM_PREFIX + "recovery.enabled"; public static final boolean DEFAULT_RM_RECOVERY_ENABLED = false; + public static final String RM_WORK_PRESERVING_RECOVERY_ENABLED = RM_PREFIX + + "work-preserving.recovery.enabled"; + public static final boolean DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED = + false; + /** Zookeeper interaction configs */ public static final String RM_ZK_PREFIX = RM_PREFIX + "zk-"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 48aac9d..1f95a90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -51,6 +51,7 @@ message ContainerIdProto { optional ApplicationIdProto app_id = 1; optional ApplicationAttemptIdProto app_attempt_id = 2; optional int32 id = 3; + optional int64 cluster_timestamp = 4; } message ResourceProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 363f666..8885769 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -60,7 +60,7 @@ public void testResourceTrackerOnHA() throws Exception { // make sure registerNodeManager works when failover happens RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, - YarnVersionInfo.getVersion(), null); + YarnVersionInfo.getVersion(), null, null); resourceTracker.registerNodeManager(request); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java index 9be829f..b2eca63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerIdPBImpl.java @@ -59,6 +59,18 @@ protected void setId(int id) { builder.setId((id)); } + @Override + public long getClusterTimestamp() { + Preconditions.checkNotNull(proto); + return proto.getClusterTimestamp(); + } + + @Override + protected void setClusterTimestamp(long timestamp) { + Preconditions.checkNotNull(builder); + builder.setClusterTimestamp(timestamp); + } + @Override public ApplicationAttemptId getApplicationAttemptId() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java index e2e04f3..bbba670 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java @@ -117,6 +117,7 @@ public void write(DataOutput out) throws IOException { out.writeInt(applicationId.getId()); out.writeInt(applicationAttemptId.getAttemptId()); out.writeInt(this.containerId.getId()); + out.writeLong(this.containerId.getClusterTimestamp()); out.writeUTF(this.nmHostAddr); out.writeUTF(this.appSubmitter); out.writeInt(this.resource.getMemory()); @@ -133,7 +134,7 @@ public void readFields(DataInput in) throws IOException { ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance(applicationId, in.readInt()); this.containerId = - ContainerId.newInstance(applicationAttemptId, in.readInt()); + ContainerId.newInstance(applicationAttemptId, in.readInt(), in.readLong()); this.nmHostAddr = in.readUTF(); this.appSubmitter = in.readUTF(); int memory = in.readInt(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java index f731af9..b88c489 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java @@ -1,4 +1,5 @@ /** +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -176,7 +177,8 @@ public static ContainerId toContainerId(String containerIdStr) { try { ApplicationAttemptId appAttemptID = toApplicationAttemptId(it); ContainerId containerId = - ContainerId.newInstance(appAttemptID, Integer.parseInt(it.next())); + ContainerId.newInstance(appAttemptID, Integer.parseInt(it.next()), + Long.parseLong(it.next())); return containerId; } catch (NumberFormatException n) { throw new IllegalArgumentException("Invalid ContainerId: " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 6ca3861..957b37d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -29,7 +30,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, - List containerStatuses) { + List containerStatuses, + List containerReports) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -37,18 +39,21 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setNodeId(nodeId); request.setNMVersion(nodeManagerVersionId); request.setContainerStatuses(containerStatuses); + request.setContainerReports(containerReports); return request; } - + public abstract NodeId getNodeId(); public abstract int getHttpPort(); public abstract Resource getResource(); public abstract String getNMVersion(); public abstract List getContainerStatuses(); + public abstract List getContainerReports(); public abstract void setNodeId(NodeId nodeId); public abstract void setHttpPort(int port); public abstract void setResource(Resource resource); public abstract void setNMVersion(String version); public abstract void setContainerStatuses(List containerStatuses); + public abstract void setContainerReports(List containerReports); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 6f9c43d..f248591 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -23,17 +23,22 @@ import java.util.Iterator; import java.util.List; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; @@ -50,7 +55,8 @@ private Resource resource = null; private NodeId nodeId = null; private List containerStatuses = null; - + private List containerReports = null; + public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); } @@ -71,6 +77,9 @@ private void mergeLocalToBuilder() { if (this.containerStatuses != null) { addContainerStatusesToProto(); } + if (this.containerReports != null) { + addContainerReportsToProto(); + } if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); } @@ -158,7 +167,13 @@ public void setHttpPort(int httpPort) { initContainerStatuses(); return containerStatuses; } - + + @Override + public List getContainerReports() { + initContainerReports(); + return containerReports; + } + private void initContainerStatuses() { if (this.containerStatuses != null) { return; @@ -171,6 +186,18 @@ private void initContainerStatuses() { } } + private void initContainerReports() { + if (this.containerReports != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainerReportsList(); + this.containerReports = new ArrayList(); + for (YarnProtos.ContainerReportProto c : list) { + this.containerReports.add(convertFromProtoFormat(c)); + } + } + @Override public void setContainerStatuses(List containers) { if (containers == null) { @@ -179,7 +206,16 @@ public void setContainerStatuses(List containers) { initContainerStatuses(); this.containerStatuses.addAll(containers); } - + + @Override + public void setContainerReports(List containerReports) { + if (containerReports == null) { + return; + } + initContainerReports(); + this.containerReports.addAll(containerReports); + } + private void addContainerStatusesToProto() { maybeInitBuilder(); builder.clearContainerStatuses(); @@ -212,6 +248,39 @@ public void remove() { }; builder.addAllContainerStatuses(it); } + + private void addContainerReportsToProto() { + maybeInitBuilder(); + builder.clearContainerReports(); + if (containerReports == null) { + return; + } + Iterable it = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containerReports.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerReportProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainerReports(it); + } @Override public int hashCode() { @@ -262,8 +331,16 @@ private ResourceProto convertToProtoFormat(Resource t) { private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) { return new ContainerStatusPBImpl(c); } - + + private ContainerReportPBImpl convertFromProtoFormat(ContainerReportProto c) { + return new ContainerReportPBImpl(c); + } + private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { return ((ContainerStatusPBImpl)c).getProto(); } + + private ContainerReportProto convertToProtoFormat(ContainerReport c) { + return ((ContainerReportPBImpl)c).getProto(); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index ac25c00..d9e5f2c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -142,6 +143,12 @@ public static ContainerId newContainerId(ApplicationAttemptId appAttemptId, return ContainerId.newInstance(appAttemptId, containerId); } + public static ContainerId newContainerId(ApplicationAttemptId appAttemptId, + int containerId, + long clusterTimestamp) { + return ContainerId.newInstance(appAttemptId, containerId, clusterTimestamp); + } + public static ContainerId newContainerId(int appId, int appAttemptId, long timestamp, int id) { ApplicationId applicationId = newApplicationId(timestamp, appId); @@ -199,6 +206,20 @@ public static ContainerStatus newContainerStatus(ContainerId containerId, return containerStatus; } + // TODO Define a better class that adds properties we care about for work + // preserving RM restart node update and not other properties that we don't + // care about + public static ContainerReport newContainerReport(ContainerId containerId, + ContainerState containerState, + Resource resource) { + ContainerReport containerReport = recordFactory + .newRecordInstance(ContainerReport.class); + containerReport.setAllocatedResource(resource); + containerReport.setContainerId(containerId); + containerReport.setContainerState(containerState); + return containerReport; + } + public static Container newContainer(ContainerId containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority, Token containerToken) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c544905..d1bb4f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto { optional ResourceProto resource = 4; optional string nm_version = 5; repeated ContainerStatusProto containerStatuses = 6; + repeated ContainerReportProto containerReports = 7; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 57ff127..3f13d14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -80,7 +80,8 @@ private static CompositeServiceShutdownHook nodeManagerShutdownHook; private AtomicBoolean isStopping = new AtomicBoolean(false); - + private boolean rmWorkPreservingRestartEnbaled; + public NodeManager() { super(NodeManager.class.getName()); } @@ -130,6 +131,10 @@ protected void serviceInit(Configuration conf) throws Exception { conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); + rmWorkPreservingRestartEnbaled = conf.getBoolean(YarnConfiguration + .RM_WORK_PRESERVING_RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_ENABLED); + boolean recoveryEnabled = conf.getBoolean( YarnConfiguration.NM_RECOVERY_ENABLED, YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED); @@ -245,7 +250,10 @@ public void run() { LOG.info("Notifying ContainerManager to block new container-requests"); containerManager.setBlockNewContainerRequests(true); LOG.info("Cleaning up running containers on resync"); - containerManager.cleanupContainersOnNMResync(); + if (!rmWorkPreservingRestartEnbaled) + { + containerManager.cleanupContainersOnNMResync(); + } ((NodeStatusUpdaterImpl) nodeStatusUpdater) .rebootNodeStatusUpdaterAndRegisterWithRM(); } catch (YarnRuntimeException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 4db000c..5bb16bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -247,13 +248,18 @@ protected ResourceTracker getRMClient() throws IOException { protected void registerWithRM() throws YarnException, IOException { List containerStatuses = getContainerStatuses(); + List containerReports = getContainerReports(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerStatuses); - if (containerStatuses != null) { + nodeManagerVersionId, containerStatuses, containerReports); + if (containerStatuses != null && !containerStatuses.isEmpty()) { LOG.info("Registering with RM using finished containers :" + containerStatuses); } + if (containerReports != null && !containerReports.isEmpty()) { + LOG.info("Registering with RM using container reports :" + + containerReports); + } RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request); this.rmIdentifier = regNMResponse.getRMIdentifier(); @@ -375,6 +381,22 @@ private NodeStatus getNodeStatus(int responseId) { return containerStatuses; } + // Iterate through the NMContext and clone and get all the containers' + // report. + @VisibleForTesting + protected List getContainerReports() { + List containerReports = new ArrayList(); + for (Container container : this.context.getContainers().values()) { + org.apache.hadoop.yarn.api.records.ContainerReport containerReport = + container.cloneAndGetContainerReport(); + containerReports.add(containerReport); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Sending out container report: " + containerReports); + } + return containerReports; + } + private void addCompletedContainer(Container container) { synchronized (previousCompletedContainers) { previousCompletedContainers.add(container.getContainerId()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index e69e61a..0c9bf7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -25,6 +25,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; @@ -50,6 +51,8 @@ ContainerStatus cloneAndGetContainerStatus(); + ContainerReport cloneAndGetContainerReport(); + String toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 50653f5..fa5dc95 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; @@ -388,6 +389,17 @@ public ContainerStatus cloneAndGetContainerStatus() { } @Override + public ContainerReport cloneAndGetContainerReport() { + this.readLock.lock(); + try { + return BuilderUtils.newContainerReport(this.containerId, + getCurrentState(), getResource()); + } finally { + this.readLock.unlock(); + } + } + + @Override public ContainerId getContainerId() { return this.containerId; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index a021214..18d622e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Dispatcher; @@ -112,6 +113,17 @@ public ContainerStatus cloneAndGetContainerStatus() { } @Override + public ContainerReport cloneAndGetContainerReport() { + ContainerReport containerReport = recordFactory + .newRecordInstance(ContainerReport.class); + containerReport.setContainerState( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING); + containerReport.setDiagnosticsInfo("testing"); + containerReport.setContainerExitStatus(0); + return containerReport; + } + + @Override public String toString() { return ""; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index eecf039..4b7ded5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; @@ -203,7 +204,12 @@ public boolean isPmemCheckEnabled() { @Override public ContainerState getContainerState() { return ContainerState.RUNNING; - }; + } + + @Override + public ContainerReport cloneAndGetContainerReport() { + return null; + } }; nmContext.getContainers().put(containerId, container); //TODO: Gross hack. Fix in code. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 79fb5df..4f32034 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -48,6 +48,8 @@ boolean isHAEnabled(); + boolean isWorkPreservingRestartEnabled(); + HAServiceState getHAServiceState(); RMStateStore getStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1eb4b75..7088a68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -81,6 +81,7 @@ private ApplicationMasterService applicationMasterService; private RMApplicationHistoryWriter rmApplicationHistoryWriter; private ConfigurationProvider configurationProvider; + private boolean isWorkPreservingRestartEnabled; /** * Default constructor. To be used in conjunction with setter methods for @@ -231,6 +232,10 @@ void setHAEnabled(boolean isHAEnabled) { this.isHAEnabled = isHAEnabled; } + void setWorkPreservingRestartEnabled(boolean isWorkPreservingRestartEnabled) { + this.isWorkPreservingRestartEnabled = isWorkPreservingRestartEnabled; + } + void setHAServiceState(HAServiceState haServiceState) { synchronized (haServiceState) { this.haServiceState = haServiceState; @@ -323,6 +328,11 @@ public boolean isHAEnabled() { } @Override + public boolean isWorkPreservingRestartEnabled() { return + isWorkPreservingRestartEnabled; + } + + @Override public HAServiceState getHAServiceState() { synchronized (haServiceState) { return haServiceState; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 054ec04..d896144 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -219,6 +219,10 @@ protected void serviceInit(Configuration conf) throws Exception { addService(adminService); rmContext.setRMAdminService(adminService); + this.rmContext.setWorkPreservingRestartEnabled(conf.getBoolean + (YarnConfiguration.RM_WORKPRESERVING_RESTART_ENABLED, + YarnConfiguration.DEFAULT_RM_WORKPRESERVING_RESTART_ENABLED)); + this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); if (this.rmContext.isHAEnabled()) { HAUtil.verifyAndSetConfiguration(this.conf);