diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ContainerRecoveryReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ContainerRecoveryReport.java new file mode 100644 index 0000000..2d4031f --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ContainerRecoveryReport.java @@ -0,0 +1,92 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.api.protocolrecords; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Records; + +public abstract class ContainerRecoveryReport { + + public static ContainerRecoveryReport newInstance(ContainerId containerId, + ContainerState containerState, Resource allocatedResource, + String diagnostics, int containerExitStatus) { + ContainerRecoveryReport report = + Records.newRecord(ContainerRecoveryReport.class); + report.setContainerId(containerId); + report.setContainerState(containerState); + report.setAllocatedResource(allocatedResource); + report.setDiagnostics(diagnostics); + report.setContainerExitStatus(containerExitStatus); + return report; + } + + /** + * Get the ContainerId of the container. + * + * @return ContainerId of the container. + */ + public abstract ContainerId getContainerId(); + + public abstract void setContainerId(ContainerId containerId); + + /** + * Get the allocated Resource of the container. + * + * @return allocated Resource of the container. + */ + public abstract Resource getAllocatedResource(); + + + public abstract void setAllocatedResource(Resource resource); + + /** + * Get the DiagnosticsInfo of the container. + * + * @return DiagnosticsInfo of the container + */ + public abstract String getDiagnostics(); + + public abstract void setDiagnostics(String diagnostics); + + + public abstract ContainerState getContainerState(); + + public abstract void setContainerState(ContainerState containerState); + + /** + * Get the final exit status of the container. + * + * @return final exit status of the container. + */ + public abstract int getContainerExitStatus(); + + + public abstract void setContainerExitStatus(int containerExitStatus); + + /** + * Get the Priority of the request. + * @return Priority of the request + */ + public abstract Priority getPriority(); + + public abstract void setPriority(Priority priority); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 6ca3861..2802d1c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -20,7 +20,6 @@ import java.util.List; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.Records; @@ -29,14 +28,14 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, int httpPort, Resource resource, String nodeManagerVersionId, - List containerStatuses) { + List containerReports) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); request.setResource(resource); request.setNodeId(nodeId); request.setNMVersion(nodeManagerVersionId); - request.setContainerStatuses(containerStatuses); + request.setContainerRecoveryReports(containerReports); return request; } @@ -44,11 +43,12 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public abstract int getHttpPort(); public abstract Resource getResource(); public abstract String getNMVersion(); - public abstract List getContainerStatuses(); + public abstract List getContainerRecoveryReports(); public abstract void setNodeId(NodeId nodeId); public abstract void setHttpPort(int port); public abstract void setResource(Resource resource); public abstract void setNMVersion(String version); - public abstract void setContainerStatuses(List containerStatuses); + public abstract void setContainerRecoveryReports( + List containerReports); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ContainerRecoveryReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ContainerRecoveryReportPBImpl.java new file mode 100644 index 0000000..30cefe0 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ContainerRecoveryReportPBImpl.java @@ -0,0 +1,248 @@ +package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto; +import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerRecoveryReportProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerRecoveryReportProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; + +import com.google.protobuf.TextFormat; + +public class ContainerRecoveryReportPBImpl extends ContainerRecoveryReport { + + ContainerRecoveryReportProto proto = ContainerRecoveryReportProto + .getDefaultInstance(); + ContainerRecoveryReportProto.Builder builder = null; + boolean viaProto = false; + + private ContainerId containerId = null; + private Resource resource = null; + private Priority priority = null; + + public ContainerRecoveryReportPBImpl() { + builder = ContainerRecoveryReportProto.newBuilder(); + } + + public ContainerRecoveryReportPBImpl(ContainerRecoveryReportProto proto) { + this.proto = proto; + viaProto = true; + } + + public ContainerRecoveryReportProto getProto() { + + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return this.getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + @Override + public String toString() { + return TextFormat.shortDebugString(getProto()); + } + + @Override + public Resource getAllocatedResource() { + if (this.resource != null) { + return this.resource; + } + ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasResource()) { + return null; + } + this.resource = convertFromProtoFormat(p.getResource()); + return this.resource; + } + + @Override + public ContainerId getContainerId() { + if (this.containerId != null) { + return this.containerId; + } + ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerId()) { + return null; + } + this.containerId = convertFromProtoFormat(p.getContainerId()); + return this.containerId; + } + + @Override + public String getDiagnostics() { + ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDiagnostics()) { + return null; + } + return (p.getDiagnostics()); + } + + @Override + public ContainerState getContainerState() { + ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerState()) { + return null; + } + return convertFromProtoFormat(p.getContainerState()); + } + + @Override + public void setAllocatedResource(Resource resource) { + maybeInitBuilder(); + if (resource == null) + builder.clearResource(); + this.resource = resource; + } + + public void setContainerId(ContainerId containerId) { + maybeInitBuilder(); + if (containerId == null) + builder.clearContainerId(); + this.containerId = containerId; + } + + @Override + public void setDiagnostics(String diagnosticsInfo) { + maybeInitBuilder(); + if (diagnosticsInfo == null) { + builder.clearDiagnostics(); + return; + } + builder.setDiagnostics(diagnosticsInfo); + } + + @Override + public void setContainerState(ContainerState containerState) { + maybeInitBuilder(); + if (containerState == null) { + builder.clearContainerState(); + return; + } + builder.setContainerState(convertToProtoFormat(containerState)); + } + + @Override + public int getContainerExitStatus() { + ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder; + return p.getContainerExitStatus(); + } + + @Override + public void setContainerExitStatus(int containerExitStatus) { + maybeInitBuilder(); + builder.setContainerExitStatus(containerExitStatus); + } + + @Override + public Priority getPriority() { + ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder; + if (this.priority != null) { + return this.priority; + } + if (!p.hasPriority()) { + return null; + } + this.priority = convertFromProtoFormat(p.getPriority()); + return this.priority; + } + + @Override + public void setPriority(Priority priority) { + maybeInitBuilder(); + if (priority == null) + builder.clearPriority(); + this.priority = priority; + } + + private void mergeLocalToBuilder() { + if (this.containerId != null + && !((ContainerIdPBImpl) containerId).getProto().equals( + builder.getContainerId())) { + builder.setContainerId(convertToProtoFormat(this.containerId)); + } + + if (this.resource != null + && !((ResourcePBImpl) this.resource).getProto().equals( + builder.getResource())) { + builder.setResource(convertToProtoFormat(this.resource)); + } + + if (this.priority != null) { + builder.setPriority(convertToProtoFormat(this.priority)); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ContainerRecoveryReportProto.newBuilder(proto); + } + viaProto = false; + } + + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { + return new ContainerIdPBImpl(p); + } + + private ContainerIdProto convertToProtoFormat(ContainerId t) { + return ((ContainerIdPBImpl) t).getProto(); + } + + private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { + return new ResourcePBImpl(p); + } + + private ResourceProto convertToProtoFormat(Resource t) { + return ((ResourcePBImpl) t).getProto(); + } + + private ContainerStateProto + convertToProtoFormat(ContainerState containerState) { + return ProtoUtils.convertToProtoFormat(containerState); + } + + private ContainerState convertFromProtoFormat( + ContainerStateProto containerState) { + return ProtoUtils.convertFromProtoFormat(containerState); + } + + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { + return new PriorityPBImpl(p); + } + + private PriorityProto convertToProtoFormat(Priority t) { + return ((PriorityPBImpl)t).getProto(); + } + +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index 6f9c43d..6085fc2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -20,24 +20,18 @@ import java.util.ArrayList; -import java.util.Iterator; import java.util.List; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerRecoveryReportProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -49,7 +43,7 @@ private Resource resource = null; private NodeId nodeId = null; - private List containerStatuses = null; + private List containerReports = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -68,8 +62,8 @@ public RegisterNodeManagerRequestProto getProto() { } private void mergeLocalToBuilder() { - if (this.containerStatuses != null) { - addContainerStatusesToProto(); + if (this.containerReports != null) { + addContainerRecoveryReportsToProto(); } if (this.resource != null) { builder.setResource(convertToProtoFormat(this.resource)); @@ -80,6 +74,18 @@ private void mergeLocalToBuilder() { } + private synchronized void addContainerRecoveryReportsToProto() { + maybeInitBuilder(); + builder.clearContainerReport(); + List list = + new ArrayList(); + for (ContainerRecoveryReport report : this.containerReports) { + list.add(convertToProtoFormat(report)); + } + builder.addAllContainerReport(list); + } + + private void mergeLocalToProto() { if (viaProto) maybeInitBuilder(); @@ -154,63 +160,31 @@ public void setHttpPort(int httpPort) { } @Override - public List getContainerStatuses() { - initContainerStatuses(); - return containerStatuses; + public List getContainerRecoveryReports() { + initContainerRecoveryReports(); + return containerReports; } - private void initContainerStatuses() { - if (this.containerStatuses != null) { + private void initContainerRecoveryReports() { + if (this.containerReports != null) { return; } RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getContainerStatusesList(); - this.containerStatuses = new ArrayList(); - for (ContainerStatusProto c : list) { - this.containerStatuses.add(convertFromProtoFormat(c)); + List list = p.getContainerReportList(); + this.containerReports = new ArrayList(); + for (ContainerRecoveryReportProto c : list) { + this.containerReports.add(convertFromProtoFormat(c)); } } @Override - public void setContainerStatuses(List containers) { - if (containers == null) { - return; - } - initContainerStatuses(); - this.containerStatuses.addAll(containers); - } - - private void addContainerStatusesToProto() { - maybeInitBuilder(); - builder.clearContainerStatuses(); - if (containerStatuses == null) { + public void setContainerRecoveryReports( + List containerReports) { + if (containerReports == null) { return; } - Iterable it = new Iterable() { - - @Override - public Iterator iterator() { - return new Iterator() { - Iterator iter = containerStatuses.iterator(); - - @Override - public boolean hasNext() { - return iter.hasNext(); - } - - @Override - public ContainerStatusProto next() { - return convertToProtoFormat(iter.next()); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - }; - builder.addAllContainerStatuses(it); + initContainerRecoveryReports(); + this.containerReports.addAll(containerReports); } @Override @@ -259,11 +233,11 @@ private ResourceProto convertToProtoFormat(Resource t) { return ((ResourcePBImpl)t).getProto(); } - private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) { - return new ContainerStatusPBImpl(c); + private ContainerRecoveryReportPBImpl convertFromProtoFormat(ContainerRecoveryReportProto c) { + return new ContainerRecoveryReportPBImpl(c); } - private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { - return ((ContainerStatusPBImpl)c).getProto(); + private ContainerRecoveryReportProto convertToProtoFormat(ContainerRecoveryReport c) { + return ((ContainerRecoveryReportPBImpl)c).getProto(); } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c544905..0230515 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -30,7 +30,7 @@ message RegisterNodeManagerRequestProto { optional int32 http_port = 3; optional ResourceProto resource = 4; optional string nm_version = 5; - repeated ContainerStatusProto containerStatuses = 6; + repeated ContainerRecoveryReportProto container_report = 6; } message RegisterNodeManagerResponseProto { @@ -58,3 +58,12 @@ message NodeHeartbeatResponseProto { optional int64 nextHeartBeatInterval = 7; optional string diagnostics_message = 8; } + +message ContainerRecoveryReportProto { + optional ContainerIdProto container_id = 1; + optional ContainerStateProto container_state = 2; + optional ResourceProto resource = 3; + optional PriorityProto priority = 4; + optional string diagnostics = 5 [default = "N/A"]; + optional int32 container_exit_status = 6; +} \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestProtocolRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestProtocolRecords.java new file mode 100644 index 0000000..dd5f9d9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestProtocolRecords.java @@ -0,0 +1,56 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ContainerRecoveryReportPBImpl; +import org.junit.Assert; +import org.junit.Test; + +public class TestProtocolRecords { + + @Test + public void testContainerRecoveryReport() { + ApplicationId appId = ApplicationId.newInstance(123456789, 1); + ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1); + ContainerId containerId = ContainerId.newInstance(attemptId, 1); + Resource resource = Resource.newInstance(1000, 200); + + ContainerRecoveryReport report = + ContainerRecoveryReport.newInstance(containerId, + ContainerState.COMPLETE, resource, "diagnostics", + ContainerExitStatus.ABORTED); + ContainerRecoveryReport reportProto = + new ContainerRecoveryReportPBImpl( + ((ContainerRecoveryReportPBImpl) report).getProto()); + Assert.assertEquals("diagnostics", reportProto.getDiagnostics()); + Assert.assertEquals(resource, reportProto.getAllocatedResource()); + Assert.assertEquals(ContainerExitStatus.ABORTED, + reportProto.getContainerExitStatus()); + Assert.assertEquals(ContainerState.COMPLETE, + reportProto.getContainerState()); + Assert.assertEquals(containerId, reportProto.getContainerId()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index df99737..56c50fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -246,13 +247,12 @@ protected ResourceTracker getRMClient() throws IOException { @VisibleForTesting protected void registerWithRM() throws YarnException, IOException { - List containerStatuses = getContainerStatuses(); + List containerReports = getContainerRecoveryReports(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerStatuses); - if (containerStatuses != null) { - LOG.info("Registering with RM using finished containers :" - + containerStatuses); + nodeManagerVersionId, containerReports); + if (containerReports != null) { + LOG.info("Registering with RM using containers :" + containerReports); } RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request); @@ -375,6 +375,26 @@ private NodeStatus getNodeStatus(int responseId) { return containerStatuses; } + private List getContainerRecoveryReports() { + List containerReports = + new ArrayList(); + for (Container container : this.context.getContainers().values()) { + ContainerRecoveryReport report = + container.getContainerRecoveryReport(); + containerReports.add(report); + if (report.getContainerState().equals(ContainerState.COMPLETE)) { + // Adding to finished containers cache. Cache will keep it around at + // least for #durationToTrackStoppedContainers duration. In the + // subsequent call to stop container it will get removed from cache. + updateStoppedContainersInCache(container.getContainerId()); + addCompletedContainer(container); + } + } + LOG.info("Sending out " + containerReports.size() + + " containers for recovery: " + containerReports); + return containerReports; + } + private void addCompletedContainer(Container container) { synchronized (previousCompletedContainers) { previousCompletedContainers.add(container.getContainerId()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index e69e61a..c168f7a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; public interface Container extends EventHandler { @@ -39,7 +40,7 @@ ContainerTokenIdentifier getContainerTokenIdentifier(); String getUser(); - + ContainerState getContainerState(); ContainerLaunchContext getLaunchContext(); @@ -50,6 +51,8 @@ ContainerStatus cloneAndGetContainerStatus(); + ContainerRecoveryReport getContainerRecoveryReport(); + String toString(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 50653f5..38cf15a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger; import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants; @@ -388,6 +389,17 @@ public ContainerStatus cloneAndGetContainerStatus() { } @Override + public ContainerRecoveryReport getContainerRecoveryReport() { + this.readLock.lock(); + try { + return ContainerRecoveryReport.newInstance(this.containerId, + getCurrentState(), getResource(), diagnostics.toString(), exitCode); + } finally { + this.readLock.unlock(); + } + } + + @Override public ContainerId getContainerId() { return this.containerId; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 5c2e085..fef3bd7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -27,8 +27,6 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; -import org.junit.Assert; - import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; @@ -46,6 +44,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -203,7 +203,7 @@ public RegisterNodeManagerResponse registerNodeManager( if (registerCount == 0) { // first register, no containers info. try { - Assert.assertEquals(0, request.getContainerStatuses() + Assert.assertEquals(0, request.getContainerRecoveryReports() .size()); } catch (AssertionError error) { error.printStackTrace(); @@ -214,8 +214,8 @@ public RegisterNodeManagerResponse registerNodeManager( testCompleteContainer.getContainerId(), container); } else { // second register contains the completed container info. - List statuses = - request.getContainerStatuses(); + List statuses = + request.getContainerRecoveryReports(); try { Assert.assertEquals(1, statuses.size()); Assert.assertEquals(testCompleteContainer.getContainerId(), diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java index a021214..5ef3766 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; @@ -134,4 +135,9 @@ public Resource getResource() { public ContainerTokenIdentifier getContainerTokenIdentifier() { return this.containerTokenIdentifier; } + + @Override + public ContainerRecoveryReport getContainerRecoveryReport() { + return null; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 1d40320..409fbee 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -32,7 +32,6 @@ import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -240,13 +240,6 @@ public RegisterNodeManagerResponse registerNodeManager( Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); - if (!request.getContainerStatuses().isEmpty()) { - LOG.info("received container statuses on node manager register :" - + request.getContainerStatuses()); - for (ContainerStatus containerStatus : request.getContainerStatuses()) { - handleContainerStatus(containerStatus); - } - } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); @@ -305,7 +298,7 @@ public RegisterNodeManagerResponse registerNodeManager( RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { this.rmContext.getDispatcher().getEventHandler().handle( - new RMNodeEvent(nodeId, RMNodeEventType.STARTED)); + new RMNodeStartEvent(nodeId, request.getContainerRecoveryReports())); } else { LOG.info("Reconnect from the node at: " + host); this.nmLivelinessMonitor.unregister(nodeId); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index f4f2e20..0b3c5ec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -199,12 +199,6 @@ new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, RMAppEventType.KILL, new KillAttemptTransition()) - // ACCECPTED state can once again receive APP_ACCEPTED event, because on - // recovery the app returns ACCEPTED state and the app once again go - // through the scheduler and triggers one more APP_ACCEPTED event at - // ACCEPTED state. - .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, - RMAppEventType.APP_ACCEPTED) // Transitions from RUNNING state .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, @@ -723,29 +717,36 @@ public void transition(RMAppImpl app, RMAppEvent event) { } } + // synchronously recover attempt to ensure any incoming external events + // to be processed after the attempt processes the recover event. + private void recoverAppAttempts() { + for (RMAppAttempt attempt : getAppAttempts().values()) { + attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(), + RMAppAttemptEventType.RECOVER)); + } + } + private static final class RMAppRecoveredTransition implements MultipleArcTransition { @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - for (RMAppAttempt attempt : app.getAppAttempts().values()) { - // synchronously recover attempt to ensure any incoming external events - // to be processed after the attempt processes the recover event. - attempt.handle( - new RMAppAttemptEvent(attempt.getAppAttemptId(), - RMAppAttemptEventType.RECOVER)); - } - // The app has completed. if (app.recoveredFinalState != null) { + app.recoverAppAttempts(); new FinalTransition(app.recoveredFinalState).transition(app, event); return app.recoveredFinalState; } - // Last attempt is in final state, do not add to scheduler and just return - // ACCEPTED waiting for last RMAppAttempt to send finished or failed event - // back. + // Notify scheduler about the app on recovery + new AddApplicationToSchedulerTransition().transition(app, event); + + // recover attempts + app.recoverAppAttempts(); + + // Last attempt is in final state, return ACCEPTED waiting for last + // RMAppAttempt to send finished or failed event back. if (app.currentAttempt != null && (app.currentAttempt.getState() == RMAppAttemptState.KILLED || app.currentAttempt.getState() == RMAppAttemptState.FINISHED @@ -754,9 +755,6 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { return RMAppState.ACCEPTED; } - // Notify scheduler about the app on recovery - new AddApplicationToSchedulerTransition().transition(app, event); - // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { @@ -1055,8 +1053,12 @@ public void transition(RMAppImpl app, RMAppEvent event) { if (app.finishTime == 0 ) { app.finishTime = System.currentTimeMillis(); } - app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, - finalState)); + // Recovered apps that are completed were not added to scheduler, so no + // need to remove them from scheduler. + if (app.recoveredFinalState == null) { + app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId, + finalState)); + } app.handler.handle( new RMAppManagerEvent(app.applicationId, RMAppManagerEventType.APP_COMPLETED)); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index b4bad12..4a479f8 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -203,5 +203,4 @@ * @return {@link ApplicationAttemptReport} */ ApplicationAttemptReport createApplicationAttemptReport(); - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index e289ad5..912fbaa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -267,15 +267,17 @@ .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.CONTAINER_FINISHED, new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED)) // Transitions from LAUNCHED State .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING, RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition()) - .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, + .addTransition(RMAppAttemptState.LAUNCHED, + EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new FinalSavingTransition( - new AMContainerCrashedTransition(), RMAppAttemptState.FAILED)) + new ContainerFinishedTransition( + new AMContainerCrashedBeforeRunningTransition(), + RMAppAttemptState.LAUNCHED)) .addTransition( RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -302,7 +304,9 @@ RMAppAttemptState.RUNNING, EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING), RMAppAttemptEventType.CONTAINER_FINISHED, - new ContainerFinishedTransition()) + new ContainerFinishedTransition( + new AMContainerCrashedAtRunningTransition(), + RMAppAttemptState.RUNNING)) .addTransition( RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING, RMAppAttemptEventType.EXPIRE, @@ -904,6 +908,10 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } return appAttempt.recoveredFinalState; } else { + // Add the current attempt to the scheduler. + appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( + appAttempt.getAppAttemptId(), false)); + /* * Since the application attempt's final state is not saved that means * for AM container (previous attempt) state must be one of these. @@ -1207,17 +1215,16 @@ public void transition(RMAppAttemptImpl appAttempt, } } - private static final class AMContainerCrashedTransition extends + private static final class AMContainerCrashedBeforeRunningTransition extends BaseFinalTransition { - public AMContainerCrashedTransition() { + public AMContainerCrashedBeforeRunningTransition() { super(RMAppAttemptState.FAILED); } @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { - RMAppAttemptContainerFinishedEvent finishEvent = ((RMAppAttemptContainerFinishedEvent)event); @@ -1410,6 +1417,16 @@ public void transition(RMAppAttemptImpl appAttempt, implements MultipleArcTransition { + // The transition To Do after attempt final state is saved. + private BaseTransition transitionToDo; + private RMAppAttemptState currentState; + + public ContainerFinishedTransition(BaseTransition transitionToDo, + RMAppAttemptState currentState) { + this.transitionToDo = transitionToDo; + this.currentState = currentState; + } + @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { @@ -1426,14 +1443,13 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, containerStatus.getContainerId())) { // Remember the follow up transition and save the final attempt state. appAttempt.rememberTargetTransitionsAndStoreState(event, - new ContainerFinishedFinalStateSavedTransition(), - RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); + transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED); return RMAppAttemptState.FINAL_SAVING; } // Normal container.Put it in completedcontainers list appAttempt.justFinishedContainers.add(containerStatus); - return RMAppAttemptState.RUNNING; + return this.currentState; } } @@ -1451,7 +1467,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, } } - private static class ContainerFinishedFinalStateSavedTransition extends + private static class AMContainerCrashedAtRunningTransition extends BaseTransition { @Override public void diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java index ace4435..259d68b3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java @@ -33,5 +33,7 @@ RELEASED, // Source: ContainerAllocationExpirer - EXPIRE + EXPIRE, + + RECOVER } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 2921891..1cc7d44 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -35,12 +35,14 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitonException; +import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; import org.apache.hadoop.yarn.state.StateMachine; import org.apache.hadoop.yarn.state.StateMachineFactory; @@ -65,6 +67,9 @@ RMContainerEventType.KILL) .addTransition(RMContainerState.NEW, RMContainerState.RESERVED, RMContainerEventType.RESERVED, new ContainerReservedTransition()) + .addTransition(RMContainerState.NEW, + EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED), + RMContainerEventType.RECOVER, new ContainerRecoveredTransition()) // Transitions from RESERVED state .addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED, @@ -341,6 +346,34 @@ public void transition(RMContainerImpl cont, RMContainerEvent event) { } } + private static final class ContainerRecoveredTransition + implements + MultipleArcTransition { + @Override + public RMContainerState transition(RMContainerImpl container, + RMContainerEvent event) { + ContainerRecoveryReport report = + ((RMContainerRecoverEvent) event).getContainerReport(); + if (report.getContainerState().equals(ContainerState.COMPLETE)) { + ContainerStatus status = + ContainerStatus.newInstance(report.getContainerId(), + report.getContainerState(), report.getDiagnostics(), + report.getContainerExitStatus()); + + new FinishedTransition().transition(container, + new RMContainerFinishedEvent(container.containerId, status, + RMContainerEventType.FINISHED)); + return RMContainerState.COMPLETED; + } else if (report.getContainerState().equals(ContainerState.RUNNING)) { + return RMContainerState.RUNNING; + } else { + LOG.warn("RMContainer received unexpected recover event with container" + + " state " + report.getContainerState() + " while recovering."); + return RMContainerState.RUNNING; + } + } + } + private static final class ContainerReservedTransition extends BaseTransition { @@ -398,7 +431,6 @@ public void transition(RMContainerImpl container, RMContainerEvent event) { // Inform AppAttempt container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( container.appAttemptId, finishedEvent.getRemoteContainerStatus())); - container.rmContext.getRMApplicationHistoryWriter() .containerFinished(container); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java new file mode 100644 index 0000000..f430486 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; + +public class RMContainerRecoverEvent extends RMContainerEvent { + + private final ContainerRecoveryReport containerReport; + + public RMContainerRecoverEvent(ContainerId containerId, + ContainerRecoveryReport containerReport) { + super(containerId, RMContainerEventType.RECOVER); + this.containerReport = containerReport; + } + + public ContainerRecoveryReport getContainerReport() { + return containerReport; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 0d33796..0992ca3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; @@ -460,13 +461,9 @@ private void updateMetricsForDeactivatedNode(NodeState initialState, @Override public void transition(RMNodeImpl rmNode, RMNodeEvent event) { // Inform the scheduler + RMNodeStartEvent startEvent = (RMNodeStartEvent) event; + List containers = null; - rmNode.context.getDispatcher().getEventHandler().handle( - new NodeAddedSchedulerEvent(rmNode)); - rmNode.context.getDispatcher().getEventHandler().handle( - new NodesListManagerEvent( - NodesListManagerEventType.NODE_USABLE, rmNode)); - String host = rmNode.nodeId.getHost(); if (rmNode.context.getInactiveRMNodes().containsKey(host)) { // Old node rejoining @@ -476,10 +473,17 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } else { // Increment activeNodes explicitly because this is a new node. ClusterMetrics.getMetrics().incrNumActiveNodes(); + containers = startEvent.getContainerRecoveryReports(); } + + rmNode.context.getDispatcher().getEventHandler() + .handle(new NodeAddedSchedulerEvent(rmNode, containers)); + rmNode.context.getDispatcher().getEventHandler().handle( + new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmNode)); } } - + public static class ReconnectNodeTransition implements SingleArcTransition { @@ -513,7 +517,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) { } rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); rmNode.context.getDispatcher().getEventHandler().handle( - new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED)); + new RMNodeStartEvent(newNode.getNodeID(), null)); } rmNode.context.getDispatcher().getEventHandler().handle( new NodesListManagerEvent( diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java new file mode 100644 index 0000000..1b6aac9 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmnode; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; + +public class RMNodeStartEvent extends RMNodeEvent { + + private List containerReports; + + public RMNodeStartEvent(NodeId nodeId, List containerReports) { + super(nodeId, RMNodeEventType.STARTED); + this.containerReports = containerReports; + } + + public List getContainerRecoveryReports() { + return this.containerReports; + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 642cd31..45b3c7c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -34,11 +34,22 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.Resources; +@SuppressWarnings("unchecked") public abstract class AbstractYarnScheduler implements ResourceScheduler { @@ -46,8 +57,7 @@ private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); // Nodes in the cluster, indexed by NodeId - protected Map nodes = - new ConcurrentHashMap(); + protected Map nodes = new ConcurrentHashMap(); // Whole capacity of the cluster protected Resource clusterResource = Resource.newInstance(0, 0); @@ -57,6 +67,7 @@ protected RMContext rmContext; protected Map> applications; + protected final static List EMPTY_CONTAINER_LIST = new ArrayList(); protected static final Allocation EMPTY_ALLOCATION = new Allocation( @@ -159,4 +170,95 @@ public String moveApplication(ApplicationId appId, String newQueue) throw new YarnException(getClass().getSimpleName() + " does not support moving apps between queues"); } + + public void recoverContainersOnNode( + List containerReports, RMNode nm) { + if (containerReports == null + || (containerReports != null && containerReports.isEmpty())) { + return; + } + + for (ContainerRecoveryReport status : containerReports) { + ApplicationId appId = + status.getContainerId().getApplicationAttemptId().getApplicationId(); + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp == null) { + LOG.error("Skip recovering container " + status + + " for unknown application."); + continue; + } + + if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) { + LOG.info("Skip recovering container " + status + " for unmanaged AM." + + rmApp.getApplicationId()); + continue; + } + + SchedulerApplication schedulerApp = applications.get(appId); + if (schedulerApp == null) { + LOG.info("Skip recovering container " + status + + " for unknown SchedulerApplication. Application state is " + + rmApp.getState()); + continue; + } + + LOG.info("Recovering container " + status); + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + + // create container + RMContainer rmContainer = createContainer(status, nm); + + // recover RMContainer + rmContainer.handle(new RMContainerRecoverEvent(status.getContainerId(), + status)); + + // recover scheduler node + nodes.get(nm.getNodeID()).recoverContainer(rmContainer); + + // recover queue: update headroom etc. + Queue queue = schedulerAttempt.getQueue(); + queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer); + + // recover scheduler attempt + schedulerAttempt.recoverContainer(rmContainer); + } + } + + private RMContainer createContainer(ContainerRecoveryReport status, + RMNode node) { + Container container = + Container.newInstance(status.getContainerId(), node.getNodeID(), + node.getHttpAddress(), status.getAllocatedResource(), + status.getPriority(), null); + ApplicationAttemptId attemptId = + container.getId().getApplicationAttemptId(); + RMContainer rmContainer = + new RMContainerImpl(container, attemptId, node.getNodeID(), + applications.get(attemptId.getApplicationId()).getUser(), rmContext); + return rmContainer; + } + + public SchedulerNode getSchedulerNode(NodeId nodeId) { + return nodes.get(nodeId); + } + + protected void notifyAppAccepted(ApplicationId appId) { + // No need to re-send app_accepted event to recovered apps. + if (!rmContext.getRMApps().get(appId).getState() + .equals(RMAppState.ACCEPTED)) { + rmContext.getDispatcher().getEventHandler() + .handle(new RMAppEvent(appId, RMAppEventType.APP_ACCEPTED)); + } + } + + protected void notifyAttemptAdded(ApplicationAttemptId attemptId) { + // If attempt is already at launched state, that means this is a recovered + // attempt. So no need to re-send attempt_added event to recovered attempts. + if (!rmContext.getRMApps().get(attemptId.getApplicationId()) + .getRMAppAttempt(attemptId).getState().equals(RMAppAttemptState.LAUNCHED)) { + rmContext.getDispatcher().getEventHandler() .handle( + new RMAppAttemptEvent(attemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index de71f71..1d2f16e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -55,7 +56,8 @@ private final String queueName; Queue queue; final String user; - private final AtomicInteger containerIdCounter = new AtomicInteger(0); + private int containerIdCounter = 0; + Object object = new Object(); final Set priorities = new TreeSet( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); @@ -109,7 +111,10 @@ private synchronized void clearRequests() { } public int getNewContainerId() { - return this.containerIdCounter.incrementAndGet(); + synchronized (object) { + containerIdCounter++; + return containerIdCounter; + } } /** @@ -409,4 +414,31 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo( // this.requests = appInfo.getRequests(); this.blacklist = appInfo.getBlackList(); } + + public synchronized void recoverContainer(RMContainer rmContainer) { + int containerId = rmContainer.getContainerId().getId(); + // set the containerId counter to be the max of all the Ids of recovered + // containers. + synchronized (object) { + if (containerIdCounter < containerId) { + containerIdCounter = containerId; + } + } + + QueueMetrics metrics = queue.getMetrics(); + if (pending) { + // If there was any container to recover, the application was + // running from scheduler's POV. + pending = false; + metrics.runAppAttempt(applicationId, user); + } + + // Container is completed. Skip recovering resources. + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + + metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), + false); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java index c51f819..f6fb59a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java @@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @Evolving @LimitedPrivate("yarn") @@ -60,4 +62,13 @@ boolean hasAccess(QueueACL acl, UserGroupInformation user); public ActiveUsersManager getActiveUsersManager(); + + /** + * Recover the state of the queue + * @param clusterResource the resource of the cluster + * @param application the application for which the container was allocated + * @param container the container that was recovered. + */ + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index fc7e047..a5a1cf3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.util.resource.Resources; @@ -499,5 +500,25 @@ public synchronized void move(Queue newQueue) { appSchedulingInfo.move(newQueue); this.queue = newQueue; - } + } + + public synchronized void recoverContainer(RMContainer rmContainer) { + // recover app scheduling info + appSchedulingInfo.recoverContainer(rmContainer); + + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + LOG.info("SchedulerAttempt " + getApplicationAttemptId() + + " is recovering container " + rmContainer.getContainerId()); + liveContainers.put(rmContainer.getContainerId(), rmContainer); + Resources.addTo(currentConsumption, rmContainer.getContainer() + .getResource()); + // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource + // is called. + +// newlyAllocatedContainers.add(rmContainer); +// schedulingOpportunities +// lastScheduledContainer + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 85d016b..dbf987e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -35,10 +34,10 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.util.resource.Resources; -import com.google.common.base.Preconditions; /** * Represents a YARN Cluster Node from the viewpoint of the scheduler. @@ -119,13 +118,10 @@ public String getRackName() { * The Scheduler has allocated containers on this node to the given * application. * - * @param applicationId - * application * @param rmContainer * allocated container */ - public synchronized void allocateContainer(ApplicationId applicationId, - RMContainer rmContainer) { + public synchronized void allocateContainer(RMContainer rmContainer) { Container container = rmContainer.getContainer(); deductAvailableResource(container.getResource()); ++numContainers; @@ -166,8 +162,8 @@ public Resource getTotalResource() { return this.totalResourceCapability; } - private synchronized boolean isValidContainer(Container c) { - if (launchedContainers.containsKey(c.getId())) { + public synchronized boolean isValidContainer(ContainerId containerId) { + if (launchedContainers.containsKey(containerId)) { return true; } return false; @@ -185,7 +181,7 @@ private synchronized void updateResource(Container container) { * container to be released */ public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container)) { + if (!isValidContainer(container.getId())) { LOG.error("Invalid container released " + container); return; } @@ -274,4 +270,12 @@ public synchronized RMContainer getReservedContainer() { // we can only adjust available resource if total resource is changed. Resources.addTo(this.availableResource, deltaResource); } + + + public void recoverContainer(RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + allocateContainer(rmContainer); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index f5090ba..ccb71e2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -28,7 +28,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; @@ -235,15 +234,6 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) public ActiveUsersManager getActiveUsersManager(); /** - * Recover the state of the queue - * @param clusterResource the resource of the cluster - * @param application the application for which the container was allocated - * @param container the container that was recovered. - */ - public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, - Container container); - - /** * Adds all applications in the queue and its subqueues to the given collection. * @param apps the collection to add the applications to */ diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9eed61f..9634887 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -55,12 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -513,8 +509,8 @@ private synchronized void addApplication(ApplicationId applicationId, applications.put(applicationId, application); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + + notifyAppAccepted(applicationId); } private synchronized void addApplicationAttempt( @@ -537,9 +533,8 @@ private synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " + queue.getQueueName()); - rmContext.getDispatcher().getEventHandler() .handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + + notifyAttemptAdded(applicationAttemptId); } private synchronized void doneApplication(ApplicationId applicationId, @@ -834,6 +829,8 @@ public void handle(SchedulerEvent event) { { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); } break; case NODE_REMOVED: diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5ddb9a4..65938aa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -59,15 +59,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; + @Private @Unstable public class LeafQueue implements CSQueue { @@ -564,7 +566,8 @@ public String toString() { "numContainers=" + getNumContainers(); } - private synchronized User getUser(String userName) { + @VisibleForTesting + public synchronized User getUser(String userName) { User user = users.get(userName); if (user == null) { user = new User(); @@ -1346,8 +1349,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod } // Inform the node - node.allocateContainer(application.getApplicationId(), - allocatedContainer); + node.allocateContainer(allocatedContainer); LOG.info("assignedContainer" + " application attempt=" + application.getApplicationAttemptId() + @@ -1446,7 +1448,7 @@ public void completedContainer(Resource clusterResource, } synchronized void allocateResource(Resource clusterResource, - FiCaSchedulerApp application, Resource resource) { + SchedulerApplicationAttempt application, Resource resource) { // Update queue metrics Resources.addTo(usedResources, resource); CSQueueUtils.updateQueueStatistics( @@ -1530,7 +1532,8 @@ public QueueMetrics getMetrics() { return metrics; } - static class User { + @VisibleForTesting + public static class User { Resource consumed = Resources.createResource(0, 0); int pendingApplications = 0; int activeApplications = 0; @@ -1580,13 +1583,16 @@ public synchronized void releaseContainer(Resource resource) { @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, application, container.getResource()); + allocateResource(clusterResource, attempt, rmContainer.getContainer() + .getResource()); } - getParent().recoverContainer(clusterResource, application, container); - + getParent().recoverContainer(clusterResource, attempt, rmContainer); } /** @@ -1613,5 +1619,4 @@ public void collectSchedulerApplications( apps.add(app.getApplicationAttemptId()); } } - } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index dba92a6..d83eed3 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -38,7 +38,6 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; @@ -49,9 +48,11 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -770,13 +771,16 @@ public QueueMetrics getMetrics() { @Override public void recoverContainer(Resource clusterResource, - FiCaSchedulerApp application, Container container) { + SchedulerApplicationAttempt attempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } // Careful! Locking order is important! synchronized (this) { - allocateResource(clusterResource, container.getResource()); + allocateResource(clusterResource,rmContainer.getContainer().getResource()); } if (parent != null) { - parent.recoverContainer(clusterResource, application, container); + parent.recoverContainer(clusterResource, attempt, rmContainer); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 7bab760..5227aac 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java index c487f48..9d01bbf 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java @@ -18,19 +18,34 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; +import java.util.List; + +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class NodeAddedSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; + private final List containerReports; public NodeAddedSchedulerEvent(RMNode rmNode) { super(SchedulerEventType.NODE_ADDED); this.rmNode = rmNode; + this.containerReports = null; + } + + public NodeAddedSchedulerEvent(RMNode rmNode, + List containerReports) { + super(SchedulerEventType.NODE_ADDED); + this.rmNode = rmNode; + this.containerReports = containerReports; } public RMNode getAddedRMNode() { return rmNode; } + public List getContainerReports() { + return containerReports; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 9ed5179..5376d15 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -259,8 +257,7 @@ private Resource assignContainer(FSSchedulerNode node, } // Inform the node - node.allocateContainer(app.getApplicationId(), - allocatedContainer); + node.allocateContainer(allocatedContainer); return container.getResource(); } else { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index e842a6a..b396b07 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -33,8 +33,10 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @@ -254,4 +256,11 @@ public int getNumRunnableApps() { public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 427cb86..67e5104 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -33,7 +33,9 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @Private @Unstable @@ -200,4 +202,11 @@ public ActiveUsersManager getActiveUsersManager() { // Should never be called since all applications are submitted to LeafQueues return null; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + // TODO Auto-generated method stub + + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 5eeda64..0b6f239 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -58,12 +58,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -598,8 +594,7 @@ protected synchronized void addApplication(ApplicationId applicationId, LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queueName + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + notifyAppAccepted(applicationId); } /** @@ -635,9 +630,7 @@ protected synchronized void addApplicationAttempt( LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(applicationAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + notifyAttemptAdded(applicationAttemptId); } /** diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index bc3441b..45a11fa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -56,11 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -76,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -178,6 +175,17 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + increaseUsedResources(rmContainer); + updateAppHeadRoom(schedulerAttempt); + updateAvailableResourcesMetrics(); + } }; @Override @@ -328,8 +336,8 @@ public synchronized void addApplication(ApplicationId applicationId, metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() - .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + + notifyAppAccepted(applicationId); } @VisibleForTesting @@ -353,9 +361,8 @@ public synchronized void addApplication(ApplicationId applicationId, metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptEvent(appAttemptId, - RMAppAttemptEventType.ATTEMPT_ADDED)); + + notifyAttemptAdded(appAttemptId); } private synchronized void doneApplication(ApplicationId applicationId, @@ -465,7 +472,7 @@ private void assignContainers(FiCaSchedulerNode node) { if (attempt == null) { continue; } - attempt.setHeadroom(Resources.subtract(clusterResource, usedResource)); + updateAppHeadRoom(attempt); } } @@ -636,11 +643,10 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application application.allocate(type, node, priority, request, container); // Inform the node - node.allocateContainer(application.getApplicationId(), - rmContainer); + node.allocateContainer(rmContainer); // Update usage for this container - Resources.addTo(usedResource, capability); + increaseUsedResources(rmContainer); } } @@ -684,9 +690,22 @@ private synchronized void nodeUpdate(RMNode rmNode) { LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " + node.getAvailableResource()); } - - metrics.setAvailableResourcesToQueue( - Resources.subtract(clusterResource, usedResource)); + + updateAvailableResourcesMetrics(); + } + + private void increaseUsedResources(RMContainer rmContainer) { + Resources.addTo(usedResource, rmContainer.getAllocatedResource()); + } + + private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) { + schedulerAttempt.setHeadroom(Resources.subtract(clusterResource, + usedResource)); + } + + private void updateAvailableResourcesMetrics() { + metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource, + usedResource)); } @Override @@ -696,6 +715,9 @@ public void handle(SchedulerEvent event) { { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + } break; case NODE_REMOVED: @@ -900,4 +922,8 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI, return null; } } + + public Resource getUsedResource() { + return usedResource; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index ce5dd96..d05bd87 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -54,6 +57,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.Task.State; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -170,6 +176,15 @@ public synchronized void submit() throws IOException, YarnException { AppAddedSchedulerEvent addAppEvent = new AppAddedSchedulerEvent(this.applicationId, this.queue, "user"); scheduler.handle(addAppEvent); + + // fake the attempt to avoid NPE in test. + RMAppAttempt attempt = mock(RMAppAttempt.class); + when(attempt.getState()).thenReturn(RMAppAttemptState.SUBMITTED); + RMApp spyApp = + spy(resourceManager.getRMContext().getRMApps().get(applicationId)); + when(spyApp.getRMAppAttempt(applicationAttemptId)).thenReturn(attempt); + resourceManager.getRMContext().getRMApps().put(applicationId, spyApp); + AppAttemptAddedSchedulerEvent addAttemptEvent = new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false); scheduler.handle(addAttemptEvent); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index cfd05f9..d28043d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -23,8 +23,6 @@ import java.util.ArrayList; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; @@ -34,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; @@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.util.Records; +import org.junit.Assert; public class MockAM { @@ -140,6 +140,24 @@ public AllocateResponse allocate( return allocate(reqs, releases); } + public List allocateContainers(int numContainers, + int memory, MockNM nm) throws Exception { + allocate(nm.getNodeId().getHost(), memory, numContainers, + new ArrayList()); + List containers = new ArrayList(); + int timeoutSecs = 0; + do { + nm.nodeHeartbeat(true); + containers.addAll(allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + System.out.println("Attempt " + this.attemptId + " is waiting for " + + numContainers + " containers to be allocated. Currently has " + + containers.size() + " containers allocated."); + } while (containers.size() != numContainers && timeoutSecs++ < 60); + return containers; + } + public List createReq(String[] hosts, int memory, int priority, int containers) throws Exception { List reqs = new ArrayList(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 1dcac06..cf8b408 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -32,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -105,14 +104,14 @@ public RegisterNodeManagerResponse registerNode() throws Exception { } public RegisterNodeManagerResponse registerNode( - List containerStatus) throws Exception{ + List containerReports) throws Exception{ RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); req.setHttpPort(httpPort); Resource resource = BuilderUtils.newResource(memory, vCores); req.setResource(resource); - req.setContainerStatuses(containerStatus); + req.setContainerRecoveryReports(containerReports); req.setNMVersion(version); RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); @@ -185,4 +184,11 @@ public NodeHeartbeatResponse nodeHeartbeat(Map containers = attempt.getJustFinishedContainers(); + System.out.println("Received completed containers " + containers); + for (ContainerStatus container : containers) { + if (container.getContainerId().equals( + completedContainer.getContainerId())) { + return; + } + } + Thread.sleep(200); + } + } + public void waitForState(MockNM nm, ContainerId containerId, RMContainerState containerState) throws Exception { RMContainer container = getResourceScheduler().getRMContainer(containerId); int timeoutSecs = 0; - while(container == null && timeoutSecs++ < 20) { + while(container == null && timeoutSecs++ < 100) { nm.nodeHeartbeat(true); container = getResourceScheduler().getRMContainer(containerId); System.out.println("Waiting for container " + containerId + " to be allocated."); @@ -333,7 +354,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores) public void sendNodeStarted(MockNM nm) throws Exception { RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( nm.getNodeId()); - node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(nm.getNodeId(), null)); } public void sendNodeLost(MockNM nm) throws Exception { @@ -542,4 +563,12 @@ public ApplicationReport getApplicationReport(ApplicationId appId) .newInstance(appId)); return response.getApplicationReport(); } + + // Explicitly reset queue metrics for testing. + @SuppressWarnings("static-access") + public void clearQueueMetrics(RMApp app) { + ((AbstractYarnScheduler) getResourceScheduler()) + .getSchedulerApplications().get(app.getApplicationId()).getQueue() + .getMetrics().clearQueueMetrics(); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java index b80a6bc..36153de 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java @@ -43,10 +43,11 @@ public class TestMoveApplication { private ResourceManager resourceManager = null; private static boolean failMove; - + private Configuration conf; + @Before public void setUp() throws Exception { - Configuration conf = new YarnConfiguration(); + conf = new YarnConfiguration(); conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class, FifoSchedulerWithMove.class); conf.set(YarnConfiguration.YARN_ADMIN_ACL, " "); @@ -119,28 +120,23 @@ public void testMoveTooLate() throws Exception { } } - @Test (timeout = 5000) - public void testMoveSuccessful() throws Exception { - // Submit application - Application application = new Application("user1", resourceManager); - ApplicationId appId = application.getApplicationId(); - application.submit(); - - // Wait for app to be accepted - RMApp app = resourceManager.rmContext.getRMApps().get(appId); - while (app.getState() != RMAppState.ACCEPTED) { - Thread.sleep(100); - } - - ClientRMService clientRMService = resourceManager.getClientRMService(); + @Test (timeout = 10000) + public + void testMoveSuccessful() throws Exception { + MockRM rm1 = new MockRM(conf); + rm1.start(); + RMApp app = rm1.submitApp(1024); + ClientRMService clientRMService = rm1.getClientRMService(); // FIFO scheduler does not support moves - clientRMService.moveApplicationAcrossQueues( - MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue")); - - RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId); + clientRMService + .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest + .newInstance(app.getApplicationId(), "newqueue")); + + RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId()); assertEquals("newqueue", rmApp.getQueue()); + rm1.stop(); } - + @Test public void testMoveRejectedByPermissions() throws Exception { failMove = true; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 3bcde8d..6a5ffa9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -21,15 +21,14 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -49,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -160,7 +161,7 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent() { @Test (timeout = 5000) public void testExpiredContainer() { // Start the node - node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(null, null)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); // Expire a container @@ -188,11 +189,11 @@ public void testExpiredContainer() { @Test (timeout = 5000) public void testContainerUpdate() throws InterruptedException{ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(null, null)); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); - node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node2.handle(new RMNodeStartEvent(null, null)); ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( BuilderUtils.newApplicationAttemptId( @@ -248,7 +249,7 @@ public void testContainerUpdate() throws InterruptedException{ @Test (timeout = 5000) public void testStatusChange(){ //Start the node - node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(null, null)); //Add info to the queue first node.setNextHeartBeat(false); @@ -464,7 +465,7 @@ private RMNodeImpl getRunningNode(String nmVersion) { RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, null, ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(node.getNodeID(), null)); Assert.assertEquals(NodeState.RUNNING, node.getState()); return node; } @@ -495,7 +496,7 @@ public void testAdd() { int initialUnhealthy = cm.getUnhealthyNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialRebooted = cm.getNumRebootedNMs(); - node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED)); + node.handle(new RMNodeStartEvent(node.getNodeID(), null)); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Unhealthy Nodes", diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index 49eff8b..34622fe 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -30,6 +30,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -67,13 +68,13 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; @@ -303,13 +304,11 @@ public void testRMRestart() throws Exception { nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService()); - List containerStatuses = new ArrayList(); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1 - .getCurrentAppAttempt().getAppAttemptId(), 1), - ContainerState.COMPLETE, "Killed AM container", 143); - containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + ContainerRecoveryReport report = + TestWorkPreservingRMRestart + .createContainerRecoveryReport(loadedApp1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(report)); nm2.registerNode(); rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); @@ -510,14 +509,11 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception { Assert.assertEquals(RMAppAttemptState.LAUNCHED, rmApp.getAppAttempts().get(am2.getApplicationAttemptId()) .getAppAttemptState()); - - List containerStatuses = new ArrayList(); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus( - BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1), - ContainerState.COMPLETE, "Killed AM container", 143); - containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + + ContainerRecoveryReport report = + TestWorkPreservingRMRestart.createContainerRecoveryReport( + am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(report)); rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); launchAM(rmApp, rm2, nm1); Assert.assertEquals(3, rmApp.getAppAttempts().size()); @@ -1671,13 +1667,12 @@ public void testQueueMetricsOnRMRestart() throws Exception { am1.allocate(new ArrayList(), new ArrayList()); nm1.nodeHeartbeat(true); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); - List containerStatuses = new ArrayList(); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1 - .getCurrentAppAttempt().getAppAttemptId(), 1), - ContainerState.COMPLETE, "Killed AM container", 143); - containerStatuses.add(containerStatus); - nm1.registerNode(containerStatuses); + + ContainerRecoveryReport report = + TestWorkPreservingRMRestart + .createContainerRecoveryReport(loadedApp1.getCurrentAppAttempt() + .getAppAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(report)); while (loadedApp1.getAppAttempts().size() != 2) { Thread.sleep(200); } @@ -1801,12 +1796,10 @@ protected void serviceStart() throws Exception { // ResourceTrackerService is started. super.serviceStart(); nm1.setResourceTrackerService(getResourceTrackerService()); - List status = new ArrayList(); - ContainerId amContainer = - ContainerId.newInstance(am0.getApplicationAttemptId(), 1); - status.add(ContainerStatus.newInstance(amContainer, - ContainerState.COMPLETE, "AM container exit", 143)); - nm1.registerNode(status); + ContainerRecoveryReport report = + TestWorkPreservingRMRestart.createContainerRecoveryReport( + am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(report)); } }; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java new file mode 100644 index 0000000..8f500c6 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java @@ -0,0 +1,556 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@SuppressWarnings({"rawtypes", "unchecked"}) +@RunWith(value = Parameterized.class) +public class TestWorkPreservingRMRestart { + + private YarnConfiguration conf; + private Class schedulerClass; + MockRM rm1 = null; + MockRM rm2 = null; + + @Before + public void setup() throws UnknownHostException { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); + conf.set(YarnConfiguration.RECOVERY_ENABLED, "true"); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass, + ResourceScheduler.class); + DefaultMetricsSystem.setMiniClusterMode(true); + } + + @After + public void tearDown() { + if (rm1 != null) { + rm1.stop(); + } + if (rm2 != null) { + rm2.stop(); + } + } + + @Parameterized.Parameters + public static Collection getTestParameters() { + return Arrays.asList(new Object[][] { { CapacityScheduler.class }, + { FifoScheduler.class } }); + } + + public TestWorkPreservingRMRestart(Class schedulerClass) { + this.schedulerClass = schedulerClass; + } + + // Test common scheduler state including SchedulerAttempt, SchedulerNode, + // AppSchedulingInfo can be reconstructed via the container recovery reports + // on NM re-registration. + // Also test scheduler specific changes: i.e. Queue recovery- + // CSQueue/FSQueue/FifoQueue recovery respectively. + // Test Strategy: send 3 container recovery reports(AMContainer, running + // container, completed container) on NM re-registration, check the states of + // SchedulerAttempt, SchedulerNode etc. are updated accordingly. + @Test(timeout = 20000) + public void testSchedulerRecovery() throws Exception { + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + + int containerMemory = 1024; + Resource containerResource = Resource.newInstance(containerMemory, 1); + + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // clear queue metrics + rm1.clearQueueMetrics(app1); + + // Re-start RM + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + // recover app + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + RMAppAttempt loadedAttempt1 = recoveredApp1.getCurrentAppAttempt(); + ContainerRecoveryReport amContainer = + createContainerRecoveryReport(am1.getApplicationAttemptId(), 1, + ContainerState.RUNNING); + ContainerRecoveryReport runningContainer = + createContainerRecoveryReport(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + ContainerRecoveryReport completedContainer = + createContainerRecoveryReport(am1.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer)); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(2000); + // check RMContainers are re-recreated and the container state is correct. + rm2.waitForState(nm1, amContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForState(nm1, runningContainer.getContainerId(), + RMContainerState.RUNNING); + rm2.waitForContainerToComplete(loadedAttempt1, completedContainer); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + SchedulerNode schedulerNode1 = scheduler.getSchedulerNode(nm1.getNodeId()); + + // ********* check scheduler node state.******* + // 2 running containers. + Resource usedResources = Resources.multiply(containerResource, 2); + Resource nmResource = + Resource.newInstance(nm1.getMemory(), nm1.getvCores()); + + assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId())); + assertTrue(schedulerNode1.isValidContainer(runningContainer + .getContainerId())); + assertFalse(schedulerNode1.isValidContainer(completedContainer + .getContainerId())); + // 2 launched containers, 1 completed container + assertEquals(2, schedulerNode1.getNumContainers()); + + assertEquals(Resources.subtract(nmResource, usedResources), + schedulerNode1.getAvailableResource()); + assertEquals(usedResources, schedulerNode1.getUsedResource()); + Resource availableResources = Resources.subtract(nmResource, usedResources); + + // ***** check queue state based on the underlying scheduler ******** + Map schedulerApps = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication schedulerApp = + schedulerApps.get(recoveredApp1.getApplicationId()); + + if (schedulerClass.equals(CapacityScheduler.class)) { + checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2); + } else if (schedulerClass.equals(FifoScheduler.class)) { + checkFifoQueue(schedulerApp, usedResources, availableResources); + } + + // *********** check scheduler attempt state.******** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertTrue(schedulerAttempt.getLiveContainers().contains( + scheduler.getRMContainer(amContainer.getContainerId()))); + assertTrue(schedulerAttempt.getLiveContainers().contains( + scheduler.getRMContainer(runningContainer.getContainerId()))); + assertEquals(schedulerAttempt.getCurrentConsumption(), usedResources); + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + + // *********** check appSchedulingInfo state *********** + assertEquals(4, schedulerAttempt.getNewContainerId()); + } + + private void checkCSQueue(MockRM rm, + SchedulerApplication app, + Resource clusterResource, Resource queueResource, Resource usedResource, + int numContainers) + throws Exception { + checkCSLeafQueue(rm2, app, clusterResource, queueResource, usedResource, + numContainers); + + LeafQueue queue = (LeafQueue) app.getQueue(); + Resource availableResources = Resources.subtract(queueResource, usedResource); + // ************* check Queue metrics ************ + QueueMetrics queueMetrics = queue.getMetrics(); + asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResource.getMemory(), + usedResource.getVirtualCores()); + + // ************ check user metrics *********** + QueueMetrics userMetrics = + queueMetrics.getUserMetrics(app.getUser()); + asserteMetrics(userMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResource.getMemory(), + usedResource.getVirtualCores()); + } + + private void checkCSLeafQueue(MockRM rm, + SchedulerApplication app, + Resource clusterResource, Resource queueResource, Resource usedResource, + int numContainers) { + LeafQueue leafQueue = (LeafQueue) app.getQueue(); + // assert queue used resources. + assertEquals(usedResource, leafQueue.getUsedResources()); + assertEquals(numContainers, leafQueue.getNumContainers()); + + ResourceCalculator calc = + ((CapacityScheduler) rm.getResourceScheduler()).getResourceCalculator(); + float usedCapacity = + Resources.divide(calc, clusterResource, usedResource, queueResource); + // assert queue used capacity + assertEquals(usedCapacity, leafQueue.getUsedCapacity(), 1e-8); + float absoluteUsedCapacity = + Resources.divide(calc, clusterResource, usedResource, clusterResource); + // assert queue absolute capacity + assertEquals(absoluteUsedCapacity, leafQueue.getAbsoluteUsedCapacity(), + 1e-8); + // assert user consumed resources. + assertEquals(usedResource, leafQueue.getUser(app.getUser()) + .getConsumedResources()); + } + + private void checkFifoQueue(SchedulerApplication schedulerApp, + Resource usedResources, Resource availableResources) throws Exception { + FifoScheduler scheduler = (FifoScheduler) rm2.getResourceScheduler(); + // ************ check cluster used Resources ******** + assertEquals(usedResources, scheduler.getUsedResource()); + + // ************ check app headroom **************** + SchedulerApplicationAttempt schedulerAttempt = + schedulerApp.getCurrentAppAttempt(); + assertEquals(availableResources, schedulerAttempt.getHeadroom()); + + // ************ check queue metrics **************** + QueueMetrics queueMetrics = scheduler.getRootQueueMetrics(); + asserteMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(), + availableResources.getVirtualCores(), usedResources.getMemory(), + usedResources.getVirtualCores()); + } + + // create 3 container reports for AM + public static List + createContainerRecoveryReportForApp(MockAM am) { + List list = + new ArrayList(); + ContainerRecoveryReport amContainer = + createContainerRecoveryReport(am.getApplicationAttemptId(), 1, + ContainerState.RUNNING); + ContainerRecoveryReport runningContainer = + createContainerRecoveryReport(am.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + ContainerRecoveryReport completedContainer = + createContainerRecoveryReport(am.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + list.add(amContainer); + list.add(runningContainer); + list.add(completedContainer); + return list; + } + + private static final String R = "Default"; + private static final String A = "QueueA"; + private static final String B = "QueueB"; + private static final String USER_1 = "user1"; + private static final String USER_2 = "user2"; + + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { R }); + final String Q_R = CapacitySchedulerConfiguration.ROOT + "." + R; + conf.setCapacity(Q_R, 100); + final String Q_A = Q_R + "." + A; + final String Q_B = Q_R + "." + B; + conf.setQueues(Q_R, new String[] {A, B}); + conf.setCapacity(Q_A, 50); + conf.setCapacity(Q_B, 50); + conf.setDouble(CapacitySchedulerConfiguration + .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.5f); + } + + // Test CS recovery with multi-level queues and multi-users: + // 1. setup 2 NMs each with 8GB memory; + // 2. setup 2 level queues: Default -> (QueueA, QueueB) + // 3. User1 submits 2 apps on QueueA + // 4. User2 submits 1 app on QueueB + // 5. AM and each container has 1GB memory + // 6. Restart RM. + // 7. nm1 re-syncs back containers belong to user1 + // 8. nm2 re-syncs back containers belong to user2. + // 9. Assert the parent queue and 2 leaf queues state and the metrics. + // 10. Assert each user's consumption inside the queue. + @Test (timeout = 30000) + public void testCapacitySchedulerRecovery() throws Exception { + if (!schedulerClass.equals(CapacityScheduler.class)) { + return; + } + conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + conf.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfiguration(csConf); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(csConf); + rm1 = new MockRM(csConf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + MockNM nm2 = + new MockNM("127.1.1.1:4321", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); + RMApp app1_1 = rm1.submitApp(1024, "app1_1", USER_1, null, A); + MockAM am1_1 = MockRM.launchAndRegisterAM(app1_1, rm1, nm1); + RMApp app1_2 = rm1.submitApp(1024, "app1_2", USER_1, null, A); + MockAM am1_2 = MockRM.launchAndRegisterAM(app1_2, rm1, nm2); + + RMApp app2 = rm1.submitApp(1024, "app2", USER_2, null, B); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // clear queue metrics + rm1.clearQueueMetrics(app1_1); + rm1.clearQueueMetrics(app1_2); + rm1.clearQueueMetrics(app2); + + // Re-start RM + rm2 = new MockRM(csConf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + nm2.setResourceTrackerService(rm2.getResourceTrackerService()); + + List am1_1Containers = + createContainerRecoveryReportForApp(am1_1); + List am1_2Containers = + createContainerRecoveryReportForApp(am1_2); + am1_1Containers.addAll(am1_2Containers); + nm1.registerNode(am1_1Containers); + + List am2Containers = + createContainerRecoveryReportForApp(am2); + nm2.registerNode(am2Containers); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(3000); + + // Calculate each queue's resource usage. + Resource containerResource = Resource.newInstance(1024, 1); + Resource nmResource = + Resource.newInstance(nm1.getMemory(), nm1.getvCores()); + Resource clusterResource = Resources.multiply(nmResource, 2); + Resource q1Resource = Resources.multiply(clusterResource, 0.5); + Resource q2Resource = Resources.multiply(clusterResource, 0.5); + Resource q1UsedResource = Resources.multiply(containerResource, 4); + Resource q2UsedResource = Resources.multiply(containerResource, 2); + Resource totalUsedResource = Resources.add(q1UsedResource, q2UsedResource); + Resource q1availableResources = + Resources.subtract(q1Resource, q1UsedResource); + Resource q2availableResources = + Resources.subtract(q2Resource, q2UsedResource); + Resource totalAvailableResource = + Resources.add(q1availableResources, q2availableResources); + + Map schedulerApps = + ((AbstractYarnScheduler) rm2.getResourceScheduler()) + .getSchedulerApplications(); + SchedulerApplication schedulerApp1_1 = + schedulerApps.get(app1_1.getApplicationId()); + + // assert queue A state. + checkCSLeafQueue(rm2, schedulerApp1_1, clusterResource, q1Resource, + q1UsedResource, 4); + QueueMetrics queue1Metrics = schedulerApp1_1.getQueue().getMetrics(); + asserteMetrics(queue1Metrics, 2, 0, 2, 0, 4, + q1availableResources.getMemory(), q1availableResources.getVirtualCores(), + q1UsedResource.getMemory(), q1UsedResource.getVirtualCores()); + + // assert queue B state. + SchedulerApplication schedulerApp2 = + schedulerApps.get(app2.getApplicationId()); + checkCSLeafQueue(rm2, schedulerApp2, clusterResource, q2Resource, + q2UsedResource, 2); + QueueMetrics queue2Metrics = schedulerApp2.getQueue().getMetrics(); + asserteMetrics(queue2Metrics, 1, 0, 1, 0, 2, + q2availableResources.getMemory(), q2availableResources.getVirtualCores(), + q2UsedResource.getMemory(), q2UsedResource.getVirtualCores()); + + // assert parent queue state. + LeafQueue leafQueue = (LeafQueue) schedulerApp2.getQueue(); + ParentQueue parentQueue = (ParentQueue) leafQueue.getParent(); + checkParentQueue(parentQueue, 6, totalUsedResource, (float) 6 / 16, + (float) 6 / 16); + asserteMetrics(parentQueue.getMetrics(), 3, 0, 3, 0, 6, + totalAvailableResource.getMemory(), + totalAvailableResource.getVirtualCores(), totalUsedResource.getMemory(), + totalUsedResource.getVirtualCores()); + } + + private void checkParentQueue(ParentQueue parentQueue, int numContainers, + Resource usedResource, float UsedCapacity, float absoluteUsedCapacity) { + assertEquals(numContainers, parentQueue.getNumContainers()); + assertEquals(usedResource, parentQueue.getUsedResources()); + assertEquals(UsedCapacity, parentQueue.getUsedCapacity(), 1e-8); + assertEquals(absoluteUsedCapacity, parentQueue.getAbsoluteUsedCapacity(), 1e-8); + } + + // Test RM shuts down, in the meanwhile, AM fails. Restarted RM scheduler + // should not recover the containers that belong to the failed AM. + @Test(timeout = 20000) + public void testAMfailedBetweenRMRestart() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + + ContainerRecoveryReport amContainer = + createContainerRecoveryReport(am1.getApplicationAttemptId(), 1, + ContainerState.COMPLETE); + ContainerRecoveryReport runningContainer = + createContainerRecoveryReport(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + ContainerRecoveryReport completedContainer = + createContainerRecoveryReport(am1.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(amContainer, runningContainer, + completedContainer)); + rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + // Wait for RM to settle down on recovering containers; + Thread.sleep(2000); + + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // Previous AM failed, The failed AM should once again release the + // just-recovered containers. + assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); + assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); + } + + // Apps already completed before RM restart. Restarted RM scheduler should not + // recover containers for completed apps. + @Test(timeout = 20000) + public void testContainersNotRecoveredForCompletedApps() throws Exception { + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); + nm1.registerNode(); + RMApp app1 = rm1.submitApp(200); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am1); + + rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + ContainerRecoveryReport runningContainer = + createContainerRecoveryReport(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + ContainerRecoveryReport completedContainer = + createContainerRecoveryReport(am1.getApplicationAttemptId(), 3, + ContainerState.COMPLETE); + nm1.registerNode(Arrays.asList(runningContainer, completedContainer)); + RMApp recoveredApp1 = + rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + assertEquals(RMAppState.FINISHED, recoveredApp1.getState()); + + // Wait for RM to settle down on recovering containers; + Thread.sleep(2000); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm2.getResourceScheduler(); + // scheduler should not recover containers for finished apps. + assertNull(scheduler.getRMContainer(runningContainer.getContainerId())); + assertNull(scheduler.getRMContainer(completedContainer.getContainerId())); + } + + public static ContainerRecoveryReport createContainerRecoveryReport( + ApplicationAttemptId appAttemptId, int id, ContainerState containerState) { + ContainerId containerId = ContainerId.newInstance(appAttemptId, id); + ContainerRecoveryReport containerReport = + ContainerRecoveryReport.newInstance(containerId, containerState, + Resource.newInstance(1024, 1), "recover container", 0); + return containerReport; + } + + private void asserteMetrics(QueueMetrics qm, int appsSubmitted, + int appsPending, int appsRunning, int appsCompleted, + int allocatedContainers, int availableMB, int availableVirtualCores, + int allocatedMB, int allocatedVirtualCores) { + assertEquals(appsSubmitted, qm.getAppsSubmitted()); + assertEquals(appsPending, qm.getAppsPending()); + assertEquals(appsRunning, qm.getAppsRunning()); + assertEquals(appsCompleted, qm.getAppsCompleted()); + assertEquals(allocatedContainers, qm.getAllocatedContainers()); + assertEquals(availableMB, qm.getAvailableMB()); + assertEquals(availableVirtualCores, qm.getAvailableVirtualCores()); + assertEquals(allocatedMB, qm.getAllocatedMB()); + assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores()); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index 460f35e..8980fdc 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.net.InetSocketAddress; import java.security.PrivilegedAction; @@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -387,10 +389,13 @@ public void testCreatePreemptedContainerStatus() { public static SchedulerApplication verifyAppAddedAndRemovedFromScheduler( Map> applications, - EventHandler handler, String queueName) + EventHandler handler, String queueName, RMContext rmContext) throws Exception { ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); + RMApp rmApp = mock(RMApp.class); + when(rmApp.getState()).thenReturn(RMAppState.SUBMITTED); + rmContext.getRMApps().put(appId, rmApp); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(appId, queueName, "user"); handler.handle(appAddedEvent); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 1412039..fe01ed9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -53,6 +53,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -557,6 +561,15 @@ public void testBlackListNodes() throws Exception { ApplicationId appId = BuilderUtils.newApplicationId(100, 1); ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( appId, 1); + + // fake app/attempt to avoid NPE + RMApp rmApp = mock(RMApp.class); + when(rmApp.getState()).thenReturn(RMAppState.SUBMITTED); + RMAppAttempt attempt = mock(RMAppAttempt.class); + when(attempt.getState()).thenReturn(RMAppAttemptState.SUBMITTED); + when(rmApp.getRMAppAttempt(appAttemptId)).thenReturn(attempt); + rm.getRMContext().getRMApps().put(appId, rmApp); + SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, "default", "user"); cs.handle(addAppEvent); @@ -641,7 +654,7 @@ public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { SchedulerApplication app = TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - cs.getSchedulerApplications(), cs, "a1"); + cs.getSchedulerApplications(), cs, "a1", rm.getRMContext()); Assert.assertEquals("a1", app.getQueue().getQueueName()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 2a26d30..20f72c0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -58,7 +58,9 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -351,6 +353,17 @@ public void testAppAttemptMetrics() throws Exception { final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 1); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null, + rmContext); + + // fake app/attempt to avoid NPE. + RMApp rmApp = mock(RMApp.class); + when(rmApp.getState()).thenReturn(RMAppState.SUBMITTED); + RMAppAttempt attempt = mock(RMAppAttempt.class); + when(attempt.getState()).thenReturn(RMAppAttemptState.SUBMITTED); + when(rmApp.getRMAppAttempt(appAttemptId_0)).thenReturn(attempt); + rmContext.getRMApps().put(appAttemptId_0.getApplicationId(), rmApp); + AppAddedSchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appAttemptId_0.getApplicationId(), a.getQueueName(), user_0); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index fe2cb23..9ccaaba 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2655,7 +2655,8 @@ public void testAddAndRemoveAppFromFairScheduler() throws Exception { (AbstractYarnScheduler) resourceManager .getResourceScheduler(); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - scheduler.getSchedulerApplications(), scheduler, "default"); + scheduler.getSchedulerApplications(), scheduler, "default", + resourceManager.getRMContext()); } @Test diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index f5bfc37..7fb9b26 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -599,7 +599,7 @@ public void testAddAndRemoveAppFromFiFoScheduler() throws Exception { (AbstractYarnScheduler) rm .getResourceScheduler(); TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( - fs.getSchedulerApplications(), fs, "queue"); + fs.getSchedulerApplications(), fs, "queue", rm.getRMContext()); } private void checkApplicationResourceUsage(int expected,