diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ContainerRecoveryReport.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ContainerRecoveryReport.java
new file mode 100644
index 0000000..2d4031f
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ContainerRecoveryReport.java
@@ -0,0 +1,92 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class ContainerRecoveryReport {
+
+ public static ContainerRecoveryReport newInstance(ContainerId containerId,
+ ContainerState containerState, Resource allocatedResource,
+ String diagnostics, int containerExitStatus) {
+ ContainerRecoveryReport report =
+ Records.newRecord(ContainerRecoveryReport.class);
+ report.setContainerId(containerId);
+ report.setContainerState(containerState);
+ report.setAllocatedResource(allocatedResource);
+ report.setDiagnostics(diagnostics);
+ report.setContainerExitStatus(containerExitStatus);
+ return report;
+ }
+
+ /**
+ * Get the ContainerId of the container.
+ *
+ * @return ContainerId of the container.
+ */
+ public abstract ContainerId getContainerId();
+
+ public abstract void setContainerId(ContainerId containerId);
+
+ /**
+ * Get the allocated Resource of the container.
+ *
+ * @return allocated Resource of the container.
+ */
+ public abstract Resource getAllocatedResource();
+
+
+ public abstract void setAllocatedResource(Resource resource);
+
+ /**
+ * Get the DiagnosticsInfo of the container.
+ *
+ * @return DiagnosticsInfo of the container
+ */
+ public abstract String getDiagnostics();
+
+ public abstract void setDiagnostics(String diagnostics);
+
+
+ public abstract ContainerState getContainerState();
+
+ public abstract void setContainerState(ContainerState containerState);
+
+ /**
+ * Get the final exit status of the container.
+ *
+ * @return final exit status of the container.
+ */
+ public abstract int getContainerExitStatus();
+
+
+ public abstract void setContainerExitStatus(int containerExitStatus);
+
+ /**
+ * Get the Priority of the request.
+ * @return Priority of the request
+ */
+ public abstract Priority getPriority();
+
+ public abstract void setPriority(Priority priority);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
index 6ca3861..2802d1c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
@@ -20,7 +20,6 @@
import java.util.List;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
@@ -29,14 +28,14 @@
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
- List containerStatuses) {
+ List containerReports) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
request.setResource(resource);
request.setNodeId(nodeId);
request.setNMVersion(nodeManagerVersionId);
- request.setContainerStatuses(containerStatuses);
+ request.setContainerRecoveryReports(containerReports);
return request;
}
@@ -44,11 +43,12 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
public abstract int getHttpPort();
public abstract Resource getResource();
public abstract String getNMVersion();
- public abstract List getContainerStatuses();
+ public abstract List getContainerRecoveryReports();
public abstract void setNodeId(NodeId nodeId);
public abstract void setHttpPort(int port);
public abstract void setResource(Resource resource);
public abstract void setNMVersion(String version);
- public abstract void setContainerStatuses(List containerStatuses);
+ public abstract void setContainerRecoveryReports(
+ List containerReports);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ContainerRecoveryReportPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ContainerRecoveryReportPBImpl.java
new file mode 100644
index 0000000..30cefe0
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ContainerRecoveryReportPBImpl.java
@@ -0,0 +1,248 @@
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerRecoveryReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerRecoveryReportProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
+
+import com.google.protobuf.TextFormat;
+
+public class ContainerRecoveryReportPBImpl extends ContainerRecoveryReport {
+
+ ContainerRecoveryReportProto proto = ContainerRecoveryReportProto
+ .getDefaultInstance();
+ ContainerRecoveryReportProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ContainerId containerId = null;
+ private Resource resource = null;
+ private Priority priority = null;
+
+ public ContainerRecoveryReportPBImpl() {
+ builder = ContainerRecoveryReportProto.newBuilder();
+ }
+
+ public ContainerRecoveryReportPBImpl(ContainerRecoveryReportProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public ContainerRecoveryReportProto getProto() {
+
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public int hashCode() {
+ return this.getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return TextFormat.shortDebugString(getProto());
+ }
+
+ @Override
+ public Resource getAllocatedResource() {
+ if (this.resource != null) {
+ return this.resource;
+ }
+ ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasResource()) {
+ return null;
+ }
+ this.resource = convertFromProtoFormat(p.getResource());
+ return this.resource;
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ if (this.containerId != null) {
+ return this.containerId;
+ }
+ ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasContainerId()) {
+ return null;
+ }
+ this.containerId = convertFromProtoFormat(p.getContainerId());
+ return this.containerId;
+ }
+
+ @Override
+ public String getDiagnostics() {
+ ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasDiagnostics()) {
+ return null;
+ }
+ return (p.getDiagnostics());
+ }
+
+ @Override
+ public ContainerState getContainerState() {
+ ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasContainerState()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getContainerState());
+ }
+
+ @Override
+ public void setAllocatedResource(Resource resource) {
+ maybeInitBuilder();
+ if (resource == null)
+ builder.clearResource();
+ this.resource = resource;
+ }
+
+ public void setContainerId(ContainerId containerId) {
+ maybeInitBuilder();
+ if (containerId == null)
+ builder.clearContainerId();
+ this.containerId = containerId;
+ }
+
+ @Override
+ public void setDiagnostics(String diagnosticsInfo) {
+ maybeInitBuilder();
+ if (diagnosticsInfo == null) {
+ builder.clearDiagnostics();
+ return;
+ }
+ builder.setDiagnostics(diagnosticsInfo);
+ }
+
+ @Override
+ public void setContainerState(ContainerState containerState) {
+ maybeInitBuilder();
+ if (containerState == null) {
+ builder.clearContainerState();
+ return;
+ }
+ builder.setContainerState(convertToProtoFormat(containerState));
+ }
+
+ @Override
+ public int getContainerExitStatus() {
+ ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getContainerExitStatus();
+ }
+
+ @Override
+ public void setContainerExitStatus(int containerExitStatus) {
+ maybeInitBuilder();
+ builder.setContainerExitStatus(containerExitStatus);
+ }
+
+ @Override
+ public Priority getPriority() {
+ ContainerRecoveryReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.priority != null) {
+ return this.priority;
+ }
+ if (!p.hasPriority()) {
+ return null;
+ }
+ this.priority = convertFromProtoFormat(p.getPriority());
+ return this.priority;
+ }
+
+ @Override
+ public void setPriority(Priority priority) {
+ maybeInitBuilder();
+ if (priority == null)
+ builder.clearPriority();
+ this.priority = priority;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containerId != null
+ && !((ContainerIdPBImpl) containerId).getProto().equals(
+ builder.getContainerId())) {
+ builder.setContainerId(convertToProtoFormat(this.containerId));
+ }
+
+ if (this.resource != null
+ && !((ResourcePBImpl) this.resource).getProto().equals(
+ builder.getResource())) {
+ builder.setResource(convertToProtoFormat(this.resource));
+ }
+
+ if (this.priority != null) {
+ builder.setPriority(convertToProtoFormat(this.priority));
+ }
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ContainerRecoveryReportProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl) t).getProto();
+ }
+
+ private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
+ return new ResourcePBImpl(p);
+ }
+
+ private ResourceProto convertToProtoFormat(Resource t) {
+ return ((ResourcePBImpl) t).getProto();
+ }
+
+ private ContainerStateProto
+ convertToProtoFormat(ContainerState containerState) {
+ return ProtoUtils.convertToProtoFormat(containerState);
+ }
+
+ private ContainerState convertFromProtoFormat(
+ ContainerStateProto containerState) {
+ return ProtoUtils.convertFromProtoFormat(containerState);
+ }
+
+ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
+ return new PriorityPBImpl(p);
+ }
+
+ private PriorityProto convertToProtoFormat(Priority t) {
+ return ((PriorityPBImpl)t).getProto();
+ }
+
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
index 6f9c43d..6085fc2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
@@ -20,24 +20,18 @@
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerRecoveryReportProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -49,7 +43,7 @@
private Resource resource = null;
private NodeId nodeId = null;
- private List containerStatuses = null;
+ private List containerReports = null;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -68,8 +62,8 @@ public RegisterNodeManagerRequestProto getProto() {
}
private void mergeLocalToBuilder() {
- if (this.containerStatuses != null) {
- addContainerStatusesToProto();
+ if (this.containerReports != null) {
+ addContainerRecoveryReportsToProto();
}
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
@@ -80,6 +74,18 @@ private void mergeLocalToBuilder() {
}
+ private synchronized void addContainerRecoveryReportsToProto() {
+ maybeInitBuilder();
+ builder.clearContainerReport();
+ List list =
+ new ArrayList();
+ for (ContainerRecoveryReport report : this.containerReports) {
+ list.add(convertToProtoFormat(report));
+ }
+ builder.addAllContainerReport(list);
+ }
+
+
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
@@ -154,63 +160,31 @@ public void setHttpPort(int httpPort) {
}
@Override
- public List getContainerStatuses() {
- initContainerStatuses();
- return containerStatuses;
+ public List getContainerRecoveryReports() {
+ initContainerRecoveryReports();
+ return containerReports;
}
- private void initContainerStatuses() {
- if (this.containerStatuses != null) {
+ private void initContainerRecoveryReports() {
+ if (this.containerReports != null) {
return;
}
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
- List list = p.getContainerStatusesList();
- this.containerStatuses = new ArrayList();
- for (ContainerStatusProto c : list) {
- this.containerStatuses.add(convertFromProtoFormat(c));
+ List list = p.getContainerReportList();
+ this.containerReports = new ArrayList();
+ for (ContainerRecoveryReportProto c : list) {
+ this.containerReports.add(convertFromProtoFormat(c));
}
}
@Override
- public void setContainerStatuses(List containers) {
- if (containers == null) {
- return;
- }
- initContainerStatuses();
- this.containerStatuses.addAll(containers);
- }
-
- private void addContainerStatusesToProto() {
- maybeInitBuilder();
- builder.clearContainerStatuses();
- if (containerStatuses == null) {
+ public void setContainerRecoveryReports(
+ List containerReports) {
+ if (containerReports == null) {
return;
}
- Iterable it = new Iterable() {
-
- @Override
- public Iterator iterator() {
- return new Iterator() {
- Iterator iter = containerStatuses.iterator();
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public ContainerStatusProto next() {
- return convertToProtoFormat(iter.next());
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
- builder.addAllContainerStatuses(it);
+ initContainerRecoveryReports();
+ this.containerReports.addAll(containerReports);
}
@Override
@@ -259,11 +233,11 @@ private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
- private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) {
- return new ContainerStatusPBImpl(c);
+ private ContainerRecoveryReportPBImpl convertFromProtoFormat(ContainerRecoveryReportProto c) {
+ return new ContainerRecoveryReportPBImpl(c);
}
- private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
- return ((ContainerStatusPBImpl)c).getProto();
+ private ContainerRecoveryReportProto convertToProtoFormat(ContainerRecoveryReport c) {
+ return ((ContainerRecoveryReportPBImpl)c).getProto();
}
}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index c544905..0230515 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -30,7 +30,7 @@ message RegisterNodeManagerRequestProto {
optional int32 http_port = 3;
optional ResourceProto resource = 4;
optional string nm_version = 5;
- repeated ContainerStatusProto containerStatuses = 6;
+ repeated ContainerRecoveryReportProto container_report = 6;
}
message RegisterNodeManagerResponseProto {
@@ -58,3 +58,12 @@ message NodeHeartbeatResponseProto {
optional int64 nextHeartBeatInterval = 7;
optional string diagnostics_message = 8;
}
+
+message ContainerRecoveryReportProto {
+ optional ContainerIdProto container_id = 1;
+ optional ContainerStateProto container_state = 2;
+ optional ResourceProto resource = 3;
+ optional PriorityProto priority = 4;
+ optional string diagnostics = 5 [default = "N/A"];
+ optional int32 container_exit_status = 6;
+}
\ No newline at end of file
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestProtocolRecords.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestProtocolRecords.java
new file mode 100644
index 0000000..dd5f9d9
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestProtocolRecords.java
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ContainerRecoveryReportPBImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestProtocolRecords {
+
+ @Test
+ public void testContainerRecoveryReport() {
+ ApplicationId appId = ApplicationId.newInstance(123456789, 1);
+ ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId containerId = ContainerId.newInstance(attemptId, 1);
+ Resource resource = Resource.newInstance(1000, 200);
+
+ ContainerRecoveryReport report =
+ ContainerRecoveryReport.newInstance(containerId,
+ ContainerState.COMPLETE, resource, "diagnostics",
+ ContainerExitStatus.ABORTED);
+ ContainerRecoveryReport reportProto =
+ new ContainerRecoveryReportPBImpl(
+ ((ContainerRecoveryReportPBImpl) report).getProto());
+ Assert.assertEquals("diagnostics", reportProto.getDiagnostics());
+ Assert.assertEquals(resource, reportProto.getAllocatedResource());
+ Assert.assertEquals(ContainerExitStatus.ABORTED,
+ reportProto.getContainerExitStatus());
+ Assert.assertEquals(ContainerState.COMPLETE,
+ reportProto.getContainerState());
+ Assert.assertEquals(containerId, reportProto.getContainerId());
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index df99737..56c50fe 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -246,13 +247,12 @@ protected ResourceTracker getRMClient() throws IOException {
@VisibleForTesting
protected void registerWithRM()
throws YarnException, IOException {
- List containerStatuses = getContainerStatuses();
+ List containerReports = getContainerRecoveryReports();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
- nodeManagerVersionId, containerStatuses);
- if (containerStatuses != null) {
- LOG.info("Registering with RM using finished containers :"
- + containerStatuses);
+ nodeManagerVersionId, containerReports);
+ if (containerReports != null) {
+ LOG.info("Registering with RM using containers :" + containerReports);
}
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
@@ -375,6 +375,26 @@ private NodeStatus getNodeStatus(int responseId) {
return containerStatuses;
}
+ private List getContainerRecoveryReports() {
+ List containerReports =
+ new ArrayList();
+ for (Container container : this.context.getContainers().values()) {
+ ContainerRecoveryReport report =
+ container.getContainerRecoveryReport();
+ containerReports.add(report);
+ if (report.getContainerState().equals(ContainerState.COMPLETE)) {
+ // Adding to finished containers cache. Cache will keep it around at
+ // least for #durationToTrackStoppedContainers duration. In the
+ // subsequent call to stop container it will get removed from cache.
+ updateStoppedContainersInCache(container.getContainerId());
+ addCompletedContainer(container);
+ }
+ }
+ LOG.info("Sending out " + containerReports.size()
+ + " containers for recovery: " + containerReports);
+ return containerReports;
+ }
+
private void addCompletedContainer(Container container) {
synchronized (previousCompletedContainers) {
previousCompletedContainers.add(container.getContainerId());
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index e69e61a..c168f7a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
public interface Container extends EventHandler {
@@ -39,7 +40,7 @@
ContainerTokenIdentifier getContainerTokenIdentifier();
String getUser();
-
+
ContainerState getContainerState();
ContainerLaunchContext getLaunchContext();
@@ -50,6 +51,8 @@
ContainerStatus cloneAndGetContainerStatus();
+ ContainerRecoveryReport getContainerRecoveryReport();
+
String toString();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 50653f5..38cf15a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
@@ -388,6 +389,17 @@ public ContainerStatus cloneAndGetContainerStatus() {
}
@Override
+ public ContainerRecoveryReport getContainerRecoveryReport() {
+ this.readLock.lock();
+ try {
+ return ContainerRecoveryReport.newInstance(this.containerId,
+ getCurrentState(), getResource(), diagnostics.toString(), exitCode);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
public ContainerId getContainerId() {
return this.containerId;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index 5c2e085..fef3bd7 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -27,8 +27,6 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.junit.Assert;
-
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
@@ -46,6 +44,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -57,6 +56,7 @@
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -203,7 +203,7 @@ public RegisterNodeManagerResponse registerNodeManager(
if (registerCount == 0) {
// first register, no containers info.
try {
- Assert.assertEquals(0, request.getContainerStatuses()
+ Assert.assertEquals(0, request.getContainerRecoveryReports()
.size());
} catch (AssertionError error) {
error.printStackTrace();
@@ -214,8 +214,8 @@ public RegisterNodeManagerResponse registerNodeManager(
testCompleteContainer.getContainerId(), container);
} else {
// second register contains the completed container info.
- List statuses =
- request.getContainerStatuses();
+ List statuses =
+ request.getContainerRecoveryReports();
try {
Assert.assertEquals(1, statuses.size());
Assert.assertEquals(testCompleteContainer.getContainerId(),
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index a021214..5ef3766 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -134,4 +135,9 @@ public Resource getResource() {
public ContainerTokenIdentifier getContainerTokenIdentifier() {
return this.containerTokenIdentifier;
}
+
+ @Override
+ public ContainerRecoveryReport getContainerRecoveryReport() {
+ return null;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 1d40320..409fbee 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -32,7 +32,6 @@
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -60,6 +59,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -240,13 +240,6 @@ public RegisterNodeManagerResponse registerNodeManager(
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
- if (!request.getContainerStatuses().isEmpty()) {
- LOG.info("received container statuses on node manager register :"
- + request.getContainerStatuses());
- for (ContainerStatus containerStatus : request.getContainerStatuses()) {
- handleContainerStatus(containerStatus);
- }
- }
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
@@ -305,7 +298,7 @@ public RegisterNodeManagerResponse registerNodeManager(
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
+ new RMNodeStartEvent(nodeId, request.getContainerRecoveryReports()));
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index f4f2e20..0b3c5ec 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -199,12 +199,6 @@
new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
RMAppEventType.KILL, new KillAttemptTransition())
- // ACCECPTED state can once again receive APP_ACCEPTED event, because on
- // recovery the app returns ACCEPTED state and the app once again go
- // through the scheduler and triggers one more APP_ACCEPTED event at
- // ACCEPTED state.
- .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
- RMAppEventType.APP_ACCEPTED)
// Transitions from RUNNING state
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
@@ -723,29 +717,36 @@ public void transition(RMAppImpl app, RMAppEvent event) {
}
}
+ // synchronously recover attempt to ensure any incoming external events
+ // to be processed after the attempt processes the recover event.
+ private void recoverAppAttempts() {
+ for (RMAppAttempt attempt : getAppAttempts().values()) {
+ attempt.handle(new RMAppAttemptEvent(attempt.getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
+ }
+ }
+
private static final class RMAppRecoveredTransition implements
MultipleArcTransition {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
- for (RMAppAttempt attempt : app.getAppAttempts().values()) {
- // synchronously recover attempt to ensure any incoming external events
- // to be processed after the attempt processes the recover event.
- attempt.handle(
- new RMAppAttemptEvent(attempt.getAppAttemptId(),
- RMAppAttemptEventType.RECOVER));
- }
-
// The app has completed.
if (app.recoveredFinalState != null) {
+ app.recoverAppAttempts();
new FinalTransition(app.recoveredFinalState).transition(app, event);
return app.recoveredFinalState;
}
- // Last attempt is in final state, do not add to scheduler and just return
- // ACCEPTED waiting for last RMAppAttempt to send finished or failed event
- // back.
+ // Notify scheduler about the app on recovery
+ new AddApplicationToSchedulerTransition().transition(app, event);
+
+ // recover attempts
+ app.recoverAppAttempts();
+
+ // Last attempt is in final state, return ACCEPTED waiting for last
+ // RMAppAttempt to send finished or failed event back.
if (app.currentAttempt != null
&& (app.currentAttempt.getState() == RMAppAttemptState.KILLED
|| app.currentAttempt.getState() == RMAppAttemptState.FINISHED
@@ -754,9 +755,6 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
return RMAppState.ACCEPTED;
}
- // Notify scheduler about the app on recovery
- new AddApplicationToSchedulerTransition().transition(app, event);
-
// No existent attempts means the attempt associated with this app was not
// started or started but not yet saved.
if (app.attempts.isEmpty()) {
@@ -1055,8 +1053,12 @@ public void transition(RMAppImpl app, RMAppEvent event) {
if (app.finishTime == 0 ) {
app.finishTime = System.currentTimeMillis();
}
- app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
- finalState));
+ // Recovered apps that are completed were not added to scheduler, so no
+ // need to remove them from scheduler.
+ if (app.recoveredFinalState == null) {
+ app.handler.handle(new AppRemovedSchedulerEvent(app.applicationId,
+ finalState));
+ }
app.handler.handle(
new RMAppManagerEvent(app.applicationId,
RMAppManagerEventType.APP_COMPLETED));
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
index b4bad12..4a479f8 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
@@ -203,5 +203,4 @@
* @return {@link ApplicationAttemptReport}
*/
ApplicationAttemptReport createApplicationAttemptReport();
-
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index e289ad5..912fbaa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -267,15 +267,17 @@
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.CONTAINER_FINISHED,
new FinalSavingTransition(
- new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+ new AMContainerCrashedBeforeRunningTransition(), RMAppAttemptState.FAILED))
// Transitions from LAUNCHED State
.addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
- .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
+ .addTransition(RMAppAttemptState.LAUNCHED,
+ EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
- new FinalSavingTransition(
- new AMContainerCrashedTransition(), RMAppAttemptState.FAILED))
+ new ContainerFinishedTransition(
+ new AMContainerCrashedBeforeRunningTransition(),
+ RMAppAttemptState.LAUNCHED))
.addTransition(
RMAppAttemptState.LAUNCHED, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.EXPIRE,
@@ -302,7 +304,9 @@
RMAppAttemptState.RUNNING,
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
- new ContainerFinishedTransition())
+ new ContainerFinishedTransition(
+ new AMContainerCrashedAtRunningTransition(),
+ RMAppAttemptState.RUNNING))
.addTransition(
RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.EXPIRE,
@@ -904,6 +908,10 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
}
return appAttempt.recoveredFinalState;
} else {
+ // Add the current attempt to the scheduler.
+ appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
+ appAttempt.getAppAttemptId(), false));
+
/*
* Since the application attempt's final state is not saved that means
* for AM container (previous attempt) state must be one of these.
@@ -1207,17 +1215,16 @@ public void transition(RMAppAttemptImpl appAttempt,
}
}
- private static final class AMContainerCrashedTransition extends
+ private static final class AMContainerCrashedBeforeRunningTransition extends
BaseFinalTransition {
- public AMContainerCrashedTransition() {
+ public AMContainerCrashedBeforeRunningTransition() {
super(RMAppAttemptState.FAILED);
}
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
-
RMAppAttemptContainerFinishedEvent finishEvent =
((RMAppAttemptContainerFinishedEvent)event);
@@ -1410,6 +1417,16 @@ public void transition(RMAppAttemptImpl appAttempt,
implements
MultipleArcTransition {
+ // The transition To Do after attempt final state is saved.
+ private BaseTransition transitionToDo;
+ private RMAppAttemptState currentState;
+
+ public ContainerFinishedTransition(BaseTransition transitionToDo,
+ RMAppAttemptState currentState) {
+ this.transitionToDo = transitionToDo;
+ this.currentState = currentState;
+ }
+
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
@@ -1426,14 +1443,13 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
containerStatus.getContainerId())) {
// Remember the follow up transition and save the final attempt state.
appAttempt.rememberTargetTransitionsAndStoreState(event,
- new ContainerFinishedFinalStateSavedTransition(),
- RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
+ transitionToDo, RMAppAttemptState.FAILED, RMAppAttemptState.FAILED);
return RMAppAttemptState.FINAL_SAVING;
}
// Normal container.Put it in completedcontainers list
appAttempt.justFinishedContainers.add(containerStatus);
- return RMAppAttemptState.RUNNING;
+ return this.currentState;
}
}
@@ -1451,7 +1467,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
}
}
- private static class ContainerFinishedFinalStateSavedTransition extends
+ private static class AMContainerCrashedAtRunningTransition extends
BaseTransition {
@Override
public void
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
index ace4435..259d68b3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerEventType.java
@@ -33,5 +33,7 @@
RELEASED,
// Source: ContainerAllocationExpirer
- EXPIRE
+ EXPIRE,
+
+ RECOVER
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 2921891..1cc7d44 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -35,12 +35,14 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -65,6 +67,9 @@
RMContainerEventType.KILL)
.addTransition(RMContainerState.NEW, RMContainerState.RESERVED,
RMContainerEventType.RESERVED, new ContainerReservedTransition())
+ .addTransition(RMContainerState.NEW,
+ EnumSet.of(RMContainerState.RUNNING, RMContainerState.COMPLETED),
+ RMContainerEventType.RECOVER, new ContainerRecoveredTransition())
// Transitions from RESERVED state
.addTransition(RMContainerState.RESERVED, RMContainerState.RESERVED,
@@ -341,6 +346,34 @@ public void transition(RMContainerImpl cont, RMContainerEvent event) {
}
}
+ private static final class ContainerRecoveredTransition
+ implements
+ MultipleArcTransition {
+ @Override
+ public RMContainerState transition(RMContainerImpl container,
+ RMContainerEvent event) {
+ ContainerRecoveryReport report =
+ ((RMContainerRecoverEvent) event).getContainerReport();
+ if (report.getContainerState().equals(ContainerState.COMPLETE)) {
+ ContainerStatus status =
+ ContainerStatus.newInstance(report.getContainerId(),
+ report.getContainerState(), report.getDiagnostics(),
+ report.getContainerExitStatus());
+
+ new FinishedTransition().transition(container,
+ new RMContainerFinishedEvent(container.containerId, status,
+ RMContainerEventType.FINISHED));
+ return RMContainerState.COMPLETED;
+ } else if (report.getContainerState().equals(ContainerState.RUNNING)) {
+ return RMContainerState.RUNNING;
+ } else {
+ LOG.warn("RMContainer received unexpected recover event with container"
+ + " state " + report.getContainerState() + " while recovering.");
+ return RMContainerState.RUNNING;
+ }
+ }
+ }
+
private static final class ContainerReservedTransition extends
BaseTransition {
@@ -398,7 +431,6 @@ public void transition(RMContainerImpl container, RMContainerEvent event) {
// Inform AppAttempt
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, finishedEvent.getRemoteContainerStatus()));
-
container.rmContext.getRMApplicationHistoryWriter()
.containerFinished(container);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java
new file mode 100644
index 0000000..f430486
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerRecoverEvent.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
+
+public class RMContainerRecoverEvent extends RMContainerEvent {
+
+ private final ContainerRecoveryReport containerReport;
+
+ public RMContainerRecoverEvent(ContainerId containerId,
+ ContainerRecoveryReport containerReport) {
+ super(containerId, RMContainerEventType.RECOVER);
+ this.containerReport = containerReport;
+ }
+
+ public ContainerRecoveryReport getContainerReport() {
+ return containerReport;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 0d33796..0992ca3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
@@ -460,13 +461,9 @@ private void updateMetricsForDeactivatedNode(NodeState initialState,
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
// Inform the scheduler
+ RMNodeStartEvent startEvent = (RMNodeStartEvent) event;
+ List containers = null;
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodeAddedSchedulerEvent(rmNode));
- rmNode.context.getDispatcher().getEventHandler().handle(
- new NodesListManagerEvent(
- NodesListManagerEventType.NODE_USABLE, rmNode));
-
String host = rmNode.nodeId.getHost();
if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
// Old node rejoining
@@ -476,10 +473,17 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
+ containers = startEvent.getContainerRecoveryReports();
}
+
+ rmNode.context.getDispatcher().getEventHandler()
+ .handle(new NodeAddedSchedulerEvent(rmNode, containers));
+ rmNode.context.getDispatcher().getEventHandler().handle(
+ new NodesListManagerEvent(
+ NodesListManagerEventType.NODE_USABLE, rmNode));
}
}
-
+
public static class ReconnectNodeTransition implements
SingleArcTransition {
@@ -513,7 +517,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
}
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
rmNode.context.getDispatcher().getEventHandler().handle(
- new RMNodeEvent(newNode.getNodeID(), RMNodeEventType.STARTED));
+ new RMNodeStartEvent(newNode.getNodeID(), null));
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java
new file mode 100644
index 0000000..1b6aac9
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartEvent.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
+
+public class RMNodeStartEvent extends RMNodeEvent {
+
+ private List containerReports;
+
+ public RMNodeStartEvent(NodeId nodeId, List containerReports) {
+ super(nodeId, RMNodeEventType.STARTED);
+ this.containerReports = containerReports;
+ }
+
+ public List getContainerRecoveryReports() {
+ return this.containerReports;
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 642cd31..45b3c7c 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -34,11 +34,22 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
+@SuppressWarnings("unchecked")
public abstract class AbstractYarnScheduler
implements ResourceScheduler {
@@ -46,8 +57,7 @@
private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class);
// Nodes in the cluster, indexed by NodeId
- protected Map nodes =
- new ConcurrentHashMap();
+ protected Map nodes = new ConcurrentHashMap();
// Whole capacity of the cluster
protected Resource clusterResource = Resource.newInstance(0, 0);
@@ -57,6 +67,7 @@
protected RMContext rmContext;
protected Map> applications;
+
protected final static List EMPTY_CONTAINER_LIST =
new ArrayList();
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
@@ -159,4 +170,95 @@ public String moveApplication(ApplicationId appId, String newQueue)
throw new YarnException(getClass().getSimpleName()
+ " does not support moving apps between queues");
}
+
+ public void recoverContainersOnNode(
+ List containerReports, RMNode nm) {
+ if (containerReports == null
+ || (containerReports != null && containerReports.isEmpty())) {
+ return;
+ }
+
+ for (ContainerRecoveryReport status : containerReports) {
+ ApplicationId appId =
+ status.getContainerId().getApplicationAttemptId().getApplicationId();
+ RMApp rmApp = rmContext.getRMApps().get(appId);
+ if (rmApp == null) {
+ LOG.error("Skip recovering container " + status
+ + " for unknown application.");
+ continue;
+ }
+
+ if (rmApp.getApplicationSubmissionContext().getUnmanagedAM()) {
+ LOG.info("Skip recovering container " + status + " for unmanaged AM."
+ + rmApp.getApplicationId());
+ continue;
+ }
+
+ SchedulerApplication schedulerApp = applications.get(appId);
+ if (schedulerApp == null) {
+ LOG.info("Skip recovering container " + status
+ + " for unknown SchedulerApplication. Application state is "
+ + rmApp.getState());
+ continue;
+ }
+
+ LOG.info("Recovering container " + status);
+ SchedulerApplicationAttempt schedulerAttempt =
+ schedulerApp.getCurrentAppAttempt();
+
+ // create container
+ RMContainer rmContainer = createContainer(status, nm);
+
+ // recover RMContainer
+ rmContainer.handle(new RMContainerRecoverEvent(status.getContainerId(),
+ status));
+
+ // recover scheduler node
+ nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+
+ // recover queue: update headroom etc.
+ Queue queue = schedulerAttempt.getQueue();
+ queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
+
+ // recover scheduler attempt
+ schedulerAttempt.recoverContainer(rmContainer);
+ }
+ }
+
+ private RMContainer createContainer(ContainerRecoveryReport status,
+ RMNode node) {
+ Container container =
+ Container.newInstance(status.getContainerId(), node.getNodeID(),
+ node.getHttpAddress(), status.getAllocatedResource(),
+ status.getPriority(), null);
+ ApplicationAttemptId attemptId =
+ container.getId().getApplicationAttemptId();
+ RMContainer rmContainer =
+ new RMContainerImpl(container, attemptId, node.getNodeID(),
+ applications.get(attemptId.getApplicationId()).getUser(), rmContext);
+ return rmContainer;
+ }
+
+ public SchedulerNode getSchedulerNode(NodeId nodeId) {
+ return nodes.get(nodeId);
+ }
+
+ protected void notifyAppAccepted(ApplicationId appId) {
+ // No need to re-send app_accepted event to recovered apps.
+ if (!rmContext.getRMApps().get(appId).getState()
+ .equals(RMAppState.ACCEPTED)) {
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(appId, RMAppEventType.APP_ACCEPTED));
+ }
+ }
+
+ protected void notifyAttemptAdded(ApplicationAttemptId attemptId) {
+ // If attempt is already at launched state, that means this is a recovered
+ // attempt. So no need to re-send attempt_added event to recovered attempts.
+ if (!rmContext.getRMApps().get(attemptId.getApplicationId())
+ .getRMAppAttempt(attemptId).getState().equals(RMAppAttemptState.LAUNCHED)) {
+ rmContext.getDispatcher().getEventHandler() .handle(
+ new RMAppAttemptEvent(attemptId, RMAppAttemptEventType.ATTEMPT_ADDED));
+ }
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index de71f71..1d2f16e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -26,7 +26,6 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,6 +38,8 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -55,7 +56,8 @@
private final String queueName;
Queue queue;
final String user;
- private final AtomicInteger containerIdCounter = new AtomicInteger(0);
+ private int containerIdCounter = 0;
+ Object object = new Object();
final Set priorities = new TreeSet(
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
@@ -109,7 +111,10 @@ private synchronized void clearRequests() {
}
public int getNewContainerId() {
- return this.containerIdCounter.incrementAndGet();
+ synchronized (object) {
+ containerIdCounter++;
+ return containerIdCounter;
+ }
}
/**
@@ -409,4 +414,31 @@ public synchronized void transferStateFromPreviousAppSchedulingInfo(
// this.requests = appInfo.getRequests();
this.blacklist = appInfo.getBlackList();
}
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ int containerId = rmContainer.getContainerId().getId();
+ // set the containerId counter to be the max of all the Ids of recovered
+ // containers.
+ synchronized (object) {
+ if (containerIdCounter < containerId) {
+ containerIdCounter = containerId;
+ }
+ }
+
+ QueueMetrics metrics = queue.getMetrics();
+ if (pending) {
+ // If there was any container to recover, the application was
+ // running from scheduler's POV.
+ pending = false;
+ metrics.runAppAttempt(applicationId, user);
+ }
+
+ // Container is completed. Skip recovering resources.
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+
+ metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
+ false);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
index c51f819..f6fb59a 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
@@ -26,6 +26,8 @@
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@Evolving
@LimitedPrivate("yarn")
@@ -60,4 +62,13 @@
boolean hasAccess(QueueACL acl, UserGroupInformation user);
public ActiveUsersManager getActiveUsersManager();
+
+ /**
+ * Recover the state of the queue
+ * @param clusterResource the resource of the cluster
+ * @param application the application for which the container was allocated
+ * @param container the container that was recovered.
+ */
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index fc7e047..a5a1cf3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -499,5 +500,25 @@ public synchronized void move(Queue newQueue) {
appSchedulingInfo.move(newQueue);
this.queue = newQueue;
- }
+ }
+
+ public synchronized void recoverContainer(RMContainer rmContainer) {
+ // recover app scheduling info
+ appSchedulingInfo.recoverContainer(rmContainer);
+
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ + " is recovering container " + rmContainer.getContainerId());
+ liveContainers.put(rmContainer.getContainerId(), rmContainer);
+ Resources.addTo(currentConsumption, rmContainer.getContainer()
+ .getResource());
+ // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
+ // is called.
+
+// newlyAllocatedContainers.add(rmContainer);
+// schedulingOpportunities
+// lastScheduledContainer
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 85d016b..dbf987e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -27,7 +27,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -35,10 +34,10 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.util.resource.Resources;
-import com.google.common.base.Preconditions;
/**
* Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -119,13 +118,10 @@ public String getRackName() {
* The Scheduler has allocated containers on this node to the given
* application.
*
- * @param applicationId
- * application
* @param rmContainer
* allocated container
*/
- public synchronized void allocateContainer(ApplicationId applicationId,
- RMContainer rmContainer) {
+ public synchronized void allocateContainer(RMContainer rmContainer) {
Container container = rmContainer.getContainer();
deductAvailableResource(container.getResource());
++numContainers;
@@ -166,8 +162,8 @@ public Resource getTotalResource() {
return this.totalResourceCapability;
}
- private synchronized boolean isValidContainer(Container c) {
- if (launchedContainers.containsKey(c.getId())) {
+ public synchronized boolean isValidContainer(ContainerId containerId) {
+ if (launchedContainers.containsKey(containerId)) {
return true;
}
return false;
@@ -185,7 +181,7 @@ private synchronized void updateResource(Container container) {
* container to be released
*/
public synchronized void releaseContainer(Container container) {
- if (!isValidContainer(container)) {
+ if (!isValidContainer(container.getId())) {
LOG.error("Invalid container released " + container);
return;
}
@@ -274,4 +270,12 @@ public synchronized RMContainer getReservedContainer() {
// we can only adjust available resource if total resource is changed.
Resources.addTo(this.availableResource, deltaResource);
}
+
+
+ public void recoverContainer(RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ allocateContainer(rmContainer);
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index f5090ba..ccb71e2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -28,7 +28,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
@@ -235,15 +234,6 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
public ActiveUsersManager getActiveUsersManager();
/**
- * Recover the state of the queue
- * @param clusterResource the resource of the cluster
- * @param application the application for which the container was allocated
- * @param container the container that was recovered.
- */
- public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
- Container container);
-
- /**
* Adds all applications in the queue and its subqueues to the given collection.
* @param apps the collection to add the applications to
*/
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 9eed61f..9634887 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -55,12 +55,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -513,8 +509,8 @@ private synchronized void addApplication(ApplicationId applicationId,
applications.put(applicationId, application);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName);
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+
+ notifyAppAccepted(applicationId);
}
private synchronized void addApplicationAttempt(
@@ -537,9 +533,8 @@ private synchronized void addApplicationAttempt(
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user " + application.getUser() + " in queue "
+ queue.getQueueName());
- rmContext.getDispatcher().getEventHandler() .handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+
+ notifyAttemptAdded(applicationAttemptId);
}
private synchronized void doneApplication(ApplicationId applicationId,
@@ -834,6 +829,8 @@ public void handle(SchedulerEvent event) {
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
+ recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+ nodeAddedEvent.getAddedRMNode());
}
break;
case NODE_REMOVED:
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 5ddb9a4..65938aa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -59,15 +59,17 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.annotations.VisibleForTesting;
+
@Private
@Unstable
public class LeafQueue implements CSQueue {
@@ -564,7 +566,8 @@ public String toString() {
"numContainers=" + getNumContainers();
}
- private synchronized User getUser(String userName) {
+ @VisibleForTesting
+ public synchronized User getUser(String userName) {
User user = users.get(userName);
if (user == null) {
user = new User();
@@ -1346,8 +1349,7 @@ private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode nod
}
// Inform the node
- node.allocateContainer(application.getApplicationId(),
- allocatedContainer);
+ node.allocateContainer(allocatedContainer);
LOG.info("assignedContainer" +
" application attempt=" + application.getApplicationAttemptId() +
@@ -1446,7 +1448,7 @@ public void completedContainer(Resource clusterResource,
}
synchronized void allocateResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource) {
+ SchedulerApplicationAttempt application, Resource resource) {
// Update queue metrics
Resources.addTo(usedResources, resource);
CSQueueUtils.updateQueueStatistics(
@@ -1530,7 +1532,8 @@ public QueueMetrics getMetrics() {
return metrics;
}
- static class User {
+ @VisibleForTesting
+ public static class User {
Resource consumed = Resources.createResource(0, 0);
int pendingApplications = 0;
int activeApplications = 0;
@@ -1580,13 +1583,16 @@ public synchronized void releaseContainer(Resource resource) {
@Override
public void recoverContainer(Resource clusterResource,
- FiCaSchedulerApp application, Container container) {
+ SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource, application, container.getResource());
+ allocateResource(clusterResource, attempt, rmContainer.getContainer()
+ .getResource());
}
- getParent().recoverContainer(clusterResource, application, container);
-
+ getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
/**
@@ -1613,5 +1619,4 @@ public void collectSchedulerApplications(
apps.add(app.getApplicationAttemptId());
}
}
-
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index dba92a6..d83eed3 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -38,7 +38,6 @@
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -49,9 +48,11 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -770,13 +771,16 @@ public QueueMetrics getMetrics() {
@Override
public void recoverContainer(Resource clusterResource,
- FiCaSchedulerApp application, Container container) {
+ SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource, container.getResource());
+ allocateResource(clusterResource,rmContainer.getContainer().getResource());
}
if (parent != null) {
- parent.recoverContainer(clusterResource, application, container);
+ parent.recoverContainer(clusterResource, attempt, rmContainer);
}
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 7bab760..5227aac 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
index c487f48..9d01bbf 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAddedSchedulerEvent.java
@@ -18,19 +18,34 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class NodeAddedSchedulerEvent extends SchedulerEvent {
private final RMNode rmNode;
+ private final List containerReports;
public NodeAddedSchedulerEvent(RMNode rmNode) {
super(SchedulerEventType.NODE_ADDED);
this.rmNode = rmNode;
+ this.containerReports = null;
+ }
+
+ public NodeAddedSchedulerEvent(RMNode rmNode,
+ List containerReports) {
+ super(SchedulerEventType.NODE_ADDED);
+ this.rmNode = rmNode;
+ this.containerReports = containerReports;
}
public RMNode getAddedRMNode() {
return rmNode;
}
+ public List getContainerReports() {
+ return containerReports;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
index 9ed5179..5376d15 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
@@ -31,8 +31,6 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@@ -259,8 +257,7 @@ private Resource assignContainer(FSSchedulerNode node,
}
// Inform the node
- node.allocateContainer(app.getApplicationId(),
- allocatedContainer);
+ node.allocateContainer(allocatedContainer);
return container.getResource();
} else {
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index e842a6a..b396b07 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -33,8 +33,10 @@
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -254,4 +256,11 @@ public int getNumRunnableApps() {
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index 427cb86..67e5104 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -33,7 +33,9 @@
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@Private
@Unstable
@@ -200,4 +202,11 @@ public ActiveUsersManager getActiveUsersManager() {
// Should never be called since all applications are submitted to LeafQueues
return null;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ // TODO Auto-generated method stub
+
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 5eeda64..0b6f239 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -58,12 +58,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -598,8 +594,7 @@ protected synchronized void addApplication(ApplicationId applicationId,
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", in queue: " + queueName + ", currently num of applications: "
+ applications.size());
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ notifyAppAccepted(applicationId);
}
/**
@@ -635,9 +630,7 @@ protected synchronized void addApplicationAttempt(
LOG.info("Added Application Attempt " + applicationAttemptId
+ " to scheduler from user: " + user);
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(applicationAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+ notifyAttemptAdded(applicationAttemptId);
}
/**
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index bc3441b..45a11fa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -56,11 +56,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -76,6 +72,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -178,6 +175,17 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ increaseUsedResources(rmContainer);
+ updateAppHeadRoom(schedulerAttempt);
+ updateAvailableResourcesMetrics();
+ }
};
@Override
@@ -328,8 +336,8 @@ public synchronized void addApplication(ApplicationId applicationId,
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
- rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+
+ notifyAppAccepted(applicationId);
}
@VisibleForTesting
@@ -353,9 +361,8 @@ public synchronized void addApplication(ApplicationId applicationId,
metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
- rmContext.getDispatcher().getEventHandler().handle(
- new RMAppAttemptEvent(appAttemptId,
- RMAppAttemptEventType.ATTEMPT_ADDED));
+
+ notifyAttemptAdded(appAttemptId);
}
private synchronized void doneApplication(ApplicationId applicationId,
@@ -465,7 +472,7 @@ private void assignContainers(FiCaSchedulerNode node) {
if (attempt == null) {
continue;
}
- attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
+ updateAppHeadRoom(attempt);
}
}
@@ -636,11 +643,10 @@ private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application
application.allocate(type, node, priority, request, container);
// Inform the node
- node.allocateContainer(application.getApplicationId(),
- rmContainer);
+ node.allocateContainer(rmContainer);
// Update usage for this container
- Resources.addTo(usedResource, capability);
+ increaseUsedResources(rmContainer);
}
}
@@ -684,9 +690,22 @@ private synchronized void nodeUpdate(RMNode rmNode) {
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
}
-
- metrics.setAvailableResourcesToQueue(
- Resources.subtract(clusterResource, usedResource));
+
+ updateAvailableResourcesMetrics();
+ }
+
+ private void increaseUsedResources(RMContainer rmContainer) {
+ Resources.addTo(usedResource, rmContainer.getAllocatedResource());
+ }
+
+ private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
+ schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
+ usedResource));
+ }
+
+ private void updateAvailableResourcesMetrics() {
+ metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
+ usedResource));
}
@Override
@@ -696,6 +715,9 @@ public void handle(SchedulerEvent event) {
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
+ recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+ nodeAddedEvent.getAddedRMNode());
+
}
break;
case NODE_REMOVED:
@@ -900,4 +922,8 @@ public synchronized boolean checkAccess(UserGroupInformation callerUGI,
return null;
}
}
+
+ public Resource getUsedResource() {
+ return usedResource;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index ce5dd96..d05bd87 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
@@ -54,6 +57,9 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -170,6 +176,15 @@ public synchronized void submit() throws IOException, YarnException {
AppAddedSchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(this.applicationId, this.queue, "user");
scheduler.handle(addAppEvent);
+
+ // fake the attempt to avoid NPE in test.
+ RMAppAttempt attempt = mock(RMAppAttempt.class);
+ when(attempt.getState()).thenReturn(RMAppAttemptState.SUBMITTED);
+ RMApp spyApp =
+ spy(resourceManager.getRMContext().getRMApps().get(applicationId));
+ when(spyApp.getRMAppAttempt(applicationAttemptId)).thenReturn(attempt);
+ resourceManager.getRMContext().getRMApps().put(applicationId, spyApp);
+
AppAttemptAddedSchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(this.applicationAttemptId, false);
scheduler.handle(addAttemptEvent);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index cfd05f9..d28043d 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -23,8 +23,6 @@
import java.util.ArrayList;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
@@ -34,6 +32,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -44,6 +43,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
public class MockAM {
@@ -140,6 +140,24 @@ public AllocateResponse allocate(
return allocate(reqs, releases);
}
+ public List allocateContainers(int numContainers,
+ int memory, MockNM nm) throws Exception {
+ allocate(nm.getNodeId().getHost(), memory, numContainers,
+ new ArrayList());
+ List containers = new ArrayList();
+ int timeoutSecs = 0;
+ do {
+ nm.nodeHeartbeat(true);
+ containers.addAll(allocate(new ArrayList(),
+ new ArrayList()).getAllocatedContainers());
+ Thread.sleep(200);
+ System.out.println("Attempt " + this.attemptId + " is waiting for "
+ + numContainers + " containers to be allocated. Currently has "
+ + containers.size() + " containers allocated.");
+ } while (containers.size() != numContainers && timeoutSecs++ < 60);
+ return containers;
+ }
+
public List createReq(String[] hosts, int memory, int priority,
int containers) throws Exception {
List reqs = new ArrayList();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 1dcac06..cf8b408 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -32,7 +31,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -105,14 +104,14 @@ public RegisterNodeManagerResponse registerNode() throws Exception {
}
public RegisterNodeManagerResponse registerNode(
- List containerStatus) throws Exception{
+ List containerReports) throws Exception{
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
req.setNodeId(nodeId);
req.setHttpPort(httpPort);
Resource resource = BuilderUtils.newResource(memory, vCores);
req.setResource(resource);
- req.setContainerStatuses(containerStatus);
+ req.setContainerRecoveryReports(containerReports);
req.setNMVersion(version);
RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req);
@@ -185,4 +184,11 @@ public NodeHeartbeatResponse nodeHeartbeat(Map containers = attempt.getJustFinishedContainers();
+ System.out.println("Received completed containers " + containers);
+ for (ContainerStatus container : containers) {
+ if (container.getContainerId().equals(
+ completedContainer.getContainerId())) {
+ return;
+ }
+ }
+ Thread.sleep(200);
+ }
+ }
+
public void waitForState(MockNM nm, ContainerId containerId,
RMContainerState containerState) throws Exception {
RMContainer container = getResourceScheduler().getRMContainer(containerId);
int timeoutSecs = 0;
- while(container == null && timeoutSecs++ < 20) {
+ while(container == null && timeoutSecs++ < 100) {
nm.nodeHeartbeat(true);
container = getResourceScheduler().getRMContainer(containerId);
System.out.println("Waiting for container " + containerId + " to be allocated.");
@@ -333,7 +354,7 @@ public MockNM registerNode(String nodeIdStr, int memory, int vCores)
public void sendNodeStarted(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId());
- node.handle(new RMNodeEvent(nm.getNodeId(), RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartEvent(nm.getNodeId(), null));
}
public void sendNodeLost(MockNM nm) throws Exception {
@@ -542,4 +563,12 @@ public ApplicationReport getApplicationReport(ApplicationId appId)
.newInstance(appId));
return response.getApplicationReport();
}
+
+ // Explicitly reset queue metrics for testing.
+ @SuppressWarnings("static-access")
+ public void clearQueueMetrics(RMApp app) {
+ ((AbstractYarnScheduler) getResourceScheduler())
+ .getSchedulerApplications().get(app.getApplicationId()).getQueue()
+ .getMetrics().clearQueueMetrics();
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
index b80a6bc..36153de 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestMoveApplication.java
@@ -43,10 +43,11 @@
public class TestMoveApplication {
private ResourceManager resourceManager = null;
private static boolean failMove;
-
+ private Configuration conf;
+
@Before
public void setUp() throws Exception {
- Configuration conf = new YarnConfiguration();
+ conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoSchedulerWithMove.class,
FifoSchedulerWithMove.class);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, " ");
@@ -119,28 +120,23 @@ public void testMoveTooLate() throws Exception {
}
}
- @Test (timeout = 5000)
- public void testMoveSuccessful() throws Exception {
- // Submit application
- Application application = new Application("user1", resourceManager);
- ApplicationId appId = application.getApplicationId();
- application.submit();
-
- // Wait for app to be accepted
- RMApp app = resourceManager.rmContext.getRMApps().get(appId);
- while (app.getState() != RMAppState.ACCEPTED) {
- Thread.sleep(100);
- }
-
- ClientRMService clientRMService = resourceManager.getClientRMService();
+ @Test (timeout = 10000)
+ public
+ void testMoveSuccessful() throws Exception {
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ RMApp app = rm1.submitApp(1024);
+ ClientRMService clientRMService = rm1.getClientRMService();
// FIFO scheduler does not support moves
- clientRMService.moveApplicationAcrossQueues(
- MoveApplicationAcrossQueuesRequest.newInstance(appId, "newqueue"));
-
- RMApp rmApp = resourceManager.getRMContext().getRMApps().get(appId);
+ clientRMService
+ .moveApplicationAcrossQueues(MoveApplicationAcrossQueuesRequest
+ .newInstance(app.getApplicationId(), "newqueue"));
+
+ RMApp rmApp = rm1.getRMContext().getRMApps().get(app.getApplicationId());
assertEquals("newqueue", rmApp.getQueue());
+ rm1.stop();
}
-
+
@Test
public void testMoveRejectedByPermissions() throws Exception {
failMove = true;
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 3bcde8d..6a5ffa9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -21,15 +21,14 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -49,6 +48,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -61,6 +61,7 @@
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -160,7 +161,7 @@ private RMNodeStatusEvent getMockRMNodeStatusEvent() {
@Test (timeout = 5000)
public void testExpiredContainer() {
// Start the node
- node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartEvent(null, null));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container
@@ -188,11 +189,11 @@ public void testExpiredContainer() {
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
//Start the node
- node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartEvent(null, null));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
- node2.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node2.handle(new RMNodeStartEvent(null, null));
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
@@ -248,7 +249,7 @@ public void testContainerUpdate() throws InterruptedException{
@Test (timeout = 5000)
public void testStatusChange(){
//Start the node
- node.handle(new RMNodeEvent(null,RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartEvent(null, null));
//Add info to the queue first
node.setNextHeartBeat(false);
@@ -464,7 +465,7 @@ private RMNodeImpl getRunningNode(String nmVersion) {
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, ResourceOption.newInstance(capability,
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
- node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartEvent(node.getNodeID(), null));
Assert.assertEquals(NodeState.RUNNING, node.getState());
return node;
}
@@ -495,7 +496,7 @@ public void testAdd() {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
- node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.STARTED));
+ node.handle(new RMNodeStartEvent(node.getNodeID(), null));
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index 49eff8b..34622fe 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -30,6 +30,7 @@
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -67,13 +68,13 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
@@ -303,13 +304,11 @@ public void testRMRestart() throws Exception {
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
- List containerStatuses = new ArrayList();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
- .getCurrentAppAttempt().getAppAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
+ ContainerRecoveryReport report =
+ TestWorkPreservingRMRestart
+ .createContainerRecoveryReport(loadedApp1.getCurrentAppAttempt()
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(report));
nm2.registerNode();
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
@@ -510,14 +509,11 @@ public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
Assert.assertEquals(RMAppAttemptState.LAUNCHED,
rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
.getAppAttemptState());
-
- List containerStatuses = new ArrayList();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(
- BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
+
+ ContainerRecoveryReport report =
+ TestWorkPreservingRMRestart.createContainerRecoveryReport(
+ am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(report));
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
launchAM(rmApp, rm2, nm1);
Assert.assertEquals(3, rmApp.getAppAttempts().size());
@@ -1671,13 +1667,12 @@ public void testQueueMetricsOnRMRestart() throws Exception {
am1.allocate(new ArrayList(), new ArrayList());
nm1.nodeHeartbeat(true);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
- List containerStatuses = new ArrayList();
- ContainerStatus containerStatus =
- BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
- .getCurrentAppAttempt().getAppAttemptId(), 1),
- ContainerState.COMPLETE, "Killed AM container", 143);
- containerStatuses.add(containerStatus);
- nm1.registerNode(containerStatuses);
+
+ ContainerRecoveryReport report =
+ TestWorkPreservingRMRestart
+ .createContainerRecoveryReport(loadedApp1.getCurrentAppAttempt()
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(report));
while (loadedApp1.getAppAttempts().size() != 2) {
Thread.sleep(200);
}
@@ -1801,12 +1796,10 @@ protected void serviceStart() throws Exception {
// ResourceTrackerService is started.
super.serviceStart();
nm1.setResourceTrackerService(getResourceTrackerService());
- List status = new ArrayList();
- ContainerId amContainer =
- ContainerId.newInstance(am0.getApplicationAttemptId(), 1);
- status.add(ContainerStatus.newInstance(amContainer,
- ContainerState.COMPLETE, "AM container exit", 143));
- nm1.registerNode(status);
+ ContainerRecoveryReport report =
+ TestWorkPreservingRMRestart.createContainerRecoveryReport(
+ am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ nm1.registerNode(Arrays.asList(report));
}
};
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
new file mode 100644
index 0000000..8f500c6
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ContainerRecoveryReport;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+@RunWith(value = Parameterized.class)
+public class TestWorkPreservingRMRestart {
+
+ private YarnConfiguration conf;
+ private Class> schedulerClass;
+ MockRM rm1 = null;
+ MockRM rm2 = null;
+
+ @Before
+ public void setup() throws UnknownHostException {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ conf = new YarnConfiguration();
+ UserGroupInformation.setConfiguration(conf);
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass,
+ ResourceScheduler.class);
+ DefaultMetricsSystem.setMiniClusterMode(true);
+ }
+
+ @After
+ public void tearDown() {
+ if (rm1 != null) {
+ rm1.stop();
+ }
+ if (rm2 != null) {
+ rm2.stop();
+ }
+ }
+
+ @Parameterized.Parameters
+ public static Collection