diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9a1eb54..6923f48 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1275,7 +1275,17 @@ private static void addDeprecatedKeys() {
public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
false;
-
+ /** Oversubscription (= allocation based on utilization) configs */
+ public static final String NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD =
+ NM_PREFIX + "oversubscription.allocation-threshold";
+ public static final float DEFAULT_NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD
+ = 0f;
+ @Private
+ public static final float MAX_NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD = 0.95f;
+ public static final String NM_OVERSUBSCRIPTION_PREEMPTION_THRESHOLD =
+ NM_PREFIX + "oversubscription.preemption-threshold";
+ public static final float DEFAULT_NM_OVERSUBSCRIPTION_PREEMPTION_THRESHOLD
+ = 0f;
/**
* Interval of time the linux container executor should try cleaning up
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
index 338198b..bbe325b 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
@@ -68,7 +68,7 @@ public void testResourceTrackerOnHA() throws Exception {
failoverThread = createAndStartFailoverThread();
NodeStatus status =
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
- null, null, null, null, null);
+ null, null, null, null, null, null);
NodeHeartbeatRequest request2 =
NodeHeartbeatRequest.newInstance(status, null, null,null);
resourceTracker.nodeHeartbeat(request2);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 49cced6..f7f6338 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1447,6 +1447,27 @@
+ The extent of over-allocation (container-allocation based on
+ current utilization) allowed on this node, expressed as a float between
+ 0 and 0.95. By default, over-allocation is turned off (value = 0). When
+ turned on, the node allows running OPPORTUNISTIC containers when the
+ aggregate utilization is under the value specified here multiplied by
+ the node's advertised capacity.
+
+ yarn.nodemanager.oversubscription.allocation-threshold
+ 0f
+
+
+
+ When a node is over-allocated to improve utilization by
+ running OPPORTUNISTIC containers, this config captures the utilization
+ beyond which OPPORTUNISTIC containers should start getting preempted.
+
+ yarn.nodemanager.oversubscription.preemption-threshold
+ 1
+
+
+
This configuration setting determines the capabilities
assigned to docker containers when they are launched. While these may not
be case-sensitive from a docker perspective, it is best to keep these
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 836cd4b..805aa29 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -59,7 +59,8 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId,
NodeHealthStatus nodeHealthStatus,
ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization,
- List increasedContainers) {
+ List increasedContainers,
+ OverAllocationInfo overAllocationInfo) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
@@ -69,12 +70,17 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId,
nodeStatus.setContainersUtilization(containersUtilization);
nodeStatus.setNodeUtilization(nodeUtilization);
nodeStatus.setIncreasedContainers(increasedContainers);
+ nodeStatus.setOverAllocationInfo(overAllocationInfo);
return nodeStatus;
}
public abstract NodeId getNodeId();
public abstract int getResponseId();
-
+
+ public abstract OverAllocationInfo getOverAllocationInfo();
+ public abstract void setOverAllocationInfo(
+ OverAllocationInfo overAllocationInfo);
+
public abstract List getContainersStatuses();
public abstract void setContainersStatuses(
List containersStatuses);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java
new file mode 100644
index 0000000..77952bf
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.api.records.impl.pb
+ .OverAllocationInfoPBImpl;
+
+/**
+ * Captures information on how aggressively the scheduler can over-allocate
+ * OPPORTUNISTIC containers on a node. This is node-specific, and is sent on
+ * the wire on each heartbeat.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class OverAllocationInfo {
+ public static OverAllocationInfo newInstance(
+ ResourceThresholds overAllocationThresholds) {
+ OverAllocationInfo info = new OverAllocationInfoPBImpl();
+ info.setOverAllocationThreshold(overAllocationThresholds);
+ return info;
+ }
+
+ public abstract ResourceThresholds getOverAllocationThresholds();
+
+ public abstract void setOverAllocationThreshold(
+ ResourceThresholds resourceThresholds);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java
new file mode 100644
index 0000000..4639f00
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.ResourceThresholdsPBImpl;
+
+/**
+ * Captures resource thresholds to be used for over-allocation and preemption
+ * when oversubscription is turned on.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class ResourceThresholds {
+ public static ResourceThresholds newInstance(float threshold) {
+ ResourceThresholds thresholds = new ResourceThresholdsPBImpl();
+ thresholds.setMemoryThreshold(threshold);
+ thresholds.setCpuThreshold(threshold);
+ return thresholds;
+ }
+
+ public abstract float getMemoryThreshold();
+
+ public abstract float getCpuThreshold();
+
+ public abstract void setMemoryThreshold(float memoryThreshold);
+
+ public abstract void setCpuThreshold(float cpuThreshold);
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 8dd4832..e30defa 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -40,9 +40,11 @@
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.OverAllocationInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
public class NodeStatusPBImpl extends NodeStatus {
NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
@@ -50,6 +52,7 @@
boolean viaProto = false;
private NodeId nodeId = null;
+ private OverAllocationInfo overAllocationInfo = null;
private List containers = null;
private NodeHealthStatus nodeHealthStatus = null;
private List keepAliveApplications = null;
@@ -75,6 +78,10 @@ private synchronized void mergeLocalToBuilder() {
if (this.nodeId != null) {
builder.setNodeId(convertToProtoFormat(this.nodeId));
}
+ if (this.overAllocationInfo != null) {
+ builder.setOverAllocationInfo(
+ convertToProtoFormat(this.overAllocationInfo));
+ }
if (this.containers != null) {
addContainersToProto();
}
@@ -224,6 +231,29 @@ public synchronized int getResponseId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
return p.getResponseId();
}
+
+ @Override
+ public OverAllocationInfo getOverAllocationInfo() {
+ NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.overAllocationInfo != null) {
+ return this.overAllocationInfo;
+ }
+ if (!p.hasOverAllocationInfo()) {
+ return null;
+ }
+ this.overAllocationInfo = convertFromProtoFormat(p.getOverAllocationInfo());
+ return this.overAllocationInfo;
+ }
+
+ @Override
+ public void setOverAllocationInfo(OverAllocationInfo overAllocationInfo) {
+ maybeInitBuilder();
+ if (this.overAllocationInfo == null) {
+ builder.clearOverAllocationInfo();
+ }
+ this.overAllocationInfo = overAllocationInfo;
+ }
+
@Override
public synchronized void setResponseId(int responseId) {
maybeInitBuilder();
@@ -403,11 +433,21 @@ public synchronized void setIncreasedContainers(
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
-
+
private NodeId convertFromProtoFormat(NodeIdProto proto) {
return new NodeIdPBImpl(proto);
}
+ private static OverAllocationInfoProto convertToProtoFormat(
+ OverAllocationInfo overAllocationInfo) {
+ return ((OverAllocationInfoPBImpl)overAllocationInfo).getProto();
+ }
+
+ private static OverAllocationInfo convertFromProtoFormat(
+ OverAllocationInfoProto proto) {
+ return new OverAllocationInfoPBImpl(proto);
+ }
+
private NodeHealthStatusProto convertToProtoFormat(
NodeHealthStatus healthStatus) {
return ((NodeHealthStatusPBImpl) healthStatus).getProto();
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java
new file mode 100644
index 0000000..8731733
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.OverAllocationInfoProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.OverAllocationInfoProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceThresholdsProto;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+
+public class OverAllocationInfoPBImpl extends OverAllocationInfo {
+ OverAllocationInfoProto proto = OverAllocationInfoProto.getDefaultInstance();
+ OverAllocationInfoProto.Builder builder = null;
+ boolean viaProto = false;
+
+ private ResourceThresholds overAllocationThresholds = null;
+
+ public OverAllocationInfoPBImpl() {
+ builder = OverAllocationInfoProto.newBuilder();
+ }
+
+ public OverAllocationInfoPBImpl(OverAllocationInfoProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized OverAllocationInfoProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private synchronized void mergeLocalToBuilder() {
+ if (overAllocationThresholds != null) {
+ builder.setOverAllocationThresholds(
+ convertToProtoFormat(overAllocationThresholds));
+ }
+ }
+
+ private synchronized void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = OverAllocationInfoProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public synchronized ResourceThresholds getOverAllocationThresholds() {
+ OverAllocationInfoProtoOrBuilder p = viaProto ? proto : builder;
+ if (overAllocationThresholds != null) {
+ return overAllocationThresholds;
+ }
+ if (!p.hasOverAllocationThresholds()) {
+ return null;
+ }
+ overAllocationThresholds =
+ convertFromProtoFormat(p.getOverAllocationThresholds());
+ return overAllocationThresholds;
+ }
+
+ @Override
+ public synchronized void setOverAllocationThreshold(
+ ResourceThresholds resourceThresholds) {
+ maybeInitBuilder();
+ if (this.overAllocationThresholds != null) {
+ builder.clearOverAllocationThresholds();
+ }
+ this.overAllocationThresholds = resourceThresholds;
+ }
+
+ private static ResourceThresholdsProto convertToProtoFormat(
+ ResourceThresholds overAllocationThresholds) {
+ return ((ResourceThresholdsPBImpl) overAllocationThresholds).getProto();
+ }
+
+ private static ResourceThresholds convertFromProtoFormat(
+ ResourceThresholdsProto overAllocationThresholdsProto) {
+ return new ResourceThresholdsPBImpl(overAllocationThresholdsProto);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java
new file mode 100644
index 0000000..4851895
--- /dev/null
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceThresholdsProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceThresholdsProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+
+public class ResourceThresholdsPBImpl extends ResourceThresholds{
+ ResourceThresholdsProto proto = ResourceThresholdsProto.getDefaultInstance();
+ ResourceThresholdsProto.Builder builder = null;
+ boolean viaProto = false;
+
+ public ResourceThresholdsPBImpl() {
+ builder = ResourceThresholdsProto.newBuilder();
+ }
+
+ public ResourceThresholdsPBImpl(ResourceThresholdsProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized ResourceThresholdsProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private synchronized void mergeLocalToProto() {
+ if (viaProto) {
+ maybeInitBuilder();
+ }
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private synchronized void mergeLocalToBuilder() {
+ /*
+ * Right now, we have only memory and cpu thresholds that are floats.
+ * This is a no-op until we add other non-static fields to the proto.
+ */
+ }
+
+ private synchronized void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ResourceThresholdsProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public synchronized float getMemoryThreshold() {
+ ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getMemory();
+ }
+
+ @Override
+ public synchronized float getCpuThreshold() {
+ ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getCpu();
+ }
+
+ @Override
+ public synchronized void setMemoryThreshold(float memoryThreshold) {
+ maybeInitBuilder();
+ builder.setMemory(memoryThreshold);
+ }
+
+ @Override
+ public synchronized void setCpuThreshold(float cpuThreshold) {
+ maybeInitBuilder();
+ builder.setCpu(cpuThreshold);
+ }
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 77064a0..30ef97f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -39,6 +39,7 @@ message NodeStatusProto {
optional ResourceUtilizationProto containers_utilization = 6;
optional ResourceUtilizationProto node_utilization = 7;
repeated ContainerProto increased_containers = 8;
+ optional OverAllocationInfoProto over_allocation_info = 9;
}
message MasterKeyProto {
@@ -57,3 +58,11 @@ message VersionProto {
optional int32 minor_version = 2;
}
+message OverAllocationInfoProto {
+ optional ResourceThresholdsProto over_allocation_thresholds = 1;
+}
+
+message ResourceThresholdsProto {
+ optional float memory = 1 [default = 0];
+ optional float cpu = 2 [default = 0];
+}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 9c2d1fb..2c460d2 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -87,4 +88,8 @@
ConcurrentLinkedQueue
getLogAggregationStatusForApps();
+
+ boolean isOverSubscriptionEnabled();
+
+ OverAllocationInfo getOverAllocationInfo();
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index a9a5411..9efc831 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -458,6 +459,7 @@ public void run() {
private boolean isDecommissioned = false;
private final ConcurrentLinkedQueue
logAggregationReportForApps;
+ private OverAllocationInfo overAllocationInfo;
public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInNM nmTokenSecretManager,
@@ -585,6 +587,20 @@ public void setSystemCrendentialsForApps(
getLogAggregationStatusForApps() {
return this.logAggregationReportForApps;
}
+
+ @Override
+ public boolean isOverSubscriptionEnabled() {
+ return getOverAllocationInfo() != null;
+ }
+
+ @Override
+ public OverAllocationInfo getOverAllocationInfo() {
+ return this.overAllocationInfo;
+ }
+
+ public void setOverAllocationInfo(OverAllocationInfo overAllocationInfo) {
+ this.overAllocationInfo = overAllocationInfo;
+ }
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 5806731..5646929 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -437,8 +437,9 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException {
= getIncreasedContainers();
NodeStatus nodeStatus =
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
- createKeepAliveApplicationList(), nodeHealthStatus,
- containersUtilization, nodeUtilization, increasedContainers);
+ createKeepAliveApplicationList(), nodeHealthStatus,
+ containersUtilization, nodeUtilization, increasedContainers,
+ this.context.getOverAllocationInfo());
return nodeStatus;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 446e7a1..768059a9 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -35,8 +35,13 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+import org.apache.hadoop.yarn.server.api.records.impl.pb
+ .ResourceThresholdsPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
@@ -83,6 +88,8 @@
private ResourceUtilization containersUtilization;
+ private ResourceThresholds oversubscriptionPreemptionThresholds;
+
private volatile boolean stopped = false;
public ContainersMonitorImpl(ContainerExecutor exec,
@@ -158,6 +165,14 @@ protected void serviceInit(Configuration conf) throws Exception {
LOG.info("Physical memory check enabled: " + pmemCheckEnabled);
LOG.info("Virtual memory check enabled: " + vmemCheckEnabled);
+ initializeOverSubscription(conf);
+ if (context.isOverSubscriptionEnabled()) {
+ // Oversubscription is enabled
+ pmemCheckEnabled = true;
+ LOG.info("Force enabling physical memory checks because " +
+ "oversubscription is enabled");
+ }
+
containersMonitorEnabled = isEnabled();
LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled);
@@ -191,6 +206,25 @@ protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
+ private void initializeOverSubscription(Configuration conf) {
+ float overAllocationTreshold = conf.getFloat(
+ YarnConfiguration.NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD,
+ YarnConfiguration.DEFAULT_NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD);
+
+ if (overAllocationTreshold > 0f) {
+ ((NodeManager.NMContext) context).setOverAllocationInfo(
+ OverAllocationInfo.newInstance(
+ ResourceThresholds.newInstance(overAllocationTreshold)));
+
+ float preemptionThreshold = conf.getFloat(
+ YarnConfiguration.NM_OVERSUBSCRIPTION_PREEMPTION_THRESHOLD,
+ YarnConfiguration.DEFAULT_NM_OVERSUBSCRIPTION_PREEMPTION_THRESHOLD);
+
+ this.oversubscriptionPreemptionThresholds =
+ ResourceThresholds.newInstance(preemptionThreshold);
+ }
+ }
+
private boolean isEnabled() {
if (resourceCalculatorPlugin == null) {
LOG.info("ResourceCalculatorPlugin is unavailable on this system. "
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 9bc23f6..b9bc259 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -60,6 +60,7 @@
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
@@ -674,6 +675,16 @@ public void setDecommissioned(boolean isDecommissioned) {
}
@Override
+ public boolean isOverSubscriptionEnabled() {
+ return false;
+ }
+
+ @Override
+ public OverAllocationInfo getOverAllocationInfo() {
+ return null;
+ }
+
+ @Override
public NodeResourceMonitor getNodeResourceMonitor() {
return null;
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index d1c9f6e..931b54f 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -674,7 +674,7 @@ public void testUpdateHeartbeatResponseForAppLifeCycle() {
NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
"", System.currentTimeMillis());
NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, 0, statusList, null,
- nodeHealth, null, null, null);
+ nodeHealth, null, null, null, null);
node.handle(new RMNodeStatusEvent(nodeId, nodeStatus, null));
Assert.assertEquals(1, node.getRunningApps().size());
@@ -724,7 +724,7 @@ private RMNodeImpl getUnhealthyNode() {
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
System.currentTimeMillis());
NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
- new ArrayList(), null, status, null, null, null);
+ new ArrayList(), null, status, null, null, null, null);
node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
Assert.assertEquals(NodeState.UNHEALTHY, node.getState());
return node;
@@ -922,7 +922,7 @@ public void testDecommissioningUnhealthy() {
NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick",
System.currentTimeMillis());
NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0,
- new ArrayList(), null, status, null, null, null);
+ new ArrayList(), null, status, null, null, null, null);
node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null));
Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState());
}
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 087199d..0f1fd92 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -166,7 +166,7 @@ public void testLogAggregationStatus() throws Exception {
node1ReportForApp.add(report1);
NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0,
new ArrayList(), null,
- NodeHealthStatus.newInstance(true, null, 0), null, null, null);
+ NodeHealthStatus.newInstance(true, null, 0), null, null, null, null);
node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null,
node1ReportForApp));
@@ -180,7 +180,7 @@ public void testLogAggregationStatus() throws Exception {
node2ReportForApp.add(report2);
NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0,
new ArrayList(), null,
- NodeHealthStatus.newInstance(true, null, 0), null, null, null);
+ NodeHealthStatus.newInstance(true, null, 0), null, null, null, null);
node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null,
node2ReportForApp));
// node1 and node2 has updated its log aggregation status
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
index 3fd1fd5..9d6ad5e 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java
@@ -145,7 +145,8 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException,
NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false,
"test health report", System.currentTimeMillis());
NodeStatus nodeStatus = NodeStatus.newInstance(nm3.getNodeId(), 1,
- new ArrayList(), null, nodeHealth, null, null, null);
+ new ArrayList(), null, nodeHealth, null, null, null,
+ null);
node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeStatus, null));
rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY);
@@ -657,7 +658,7 @@ public void testNodesResourceUtilization() throws JSONException, Exception {
2048, 0, (float) 5.05);
NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0,
new ArrayList(), null, nodeHealth, containerResource,
- nodeResource, null);
+ nodeResource, null, null);
node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus, null));
rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING);
diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java
index cf83e67..2a8c580 100644
--- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java
+++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java
@@ -190,6 +190,7 @@ private NodeStatus createNodeStatus(
NodeHealthStatus.newInstance(true, null, 0),
containersUtilization,
nodeUtilization,
+ null,
null);
return status;