diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 9a1eb54..6923f48 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1275,7 +1275,17 @@ private static void addDeprecatedKeys() { public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE = false; - + /** Oversubscription (= allocation based on utilization) configs */ + public static final String NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD = + NM_PREFIX + "oversubscription.allocation-threshold"; + public static final float DEFAULT_NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD + = 0f; + @Private + public static final float MAX_NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD = 0.95f; + public static final String NM_OVERSUBSCRIPTION_PREEMPTION_THRESHOLD = + NM_PREFIX + "oversubscription.preemption-threshold"; + public static final float DEFAULT_NM_OVERSUBSCRIPTION_PREEMPTION_THRESHOLD + = 0f; /** * Interval of time the linux container executor should try cleaning up diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 338198b..bbe325b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -68,7 +68,7 @@ public void testResourceTrackerOnHA() throws Exception { failoverThread = createAndStartFailoverThread(); NodeStatus status = NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, - null, null, null, null, null); + null, null, null, null, null, null); NodeHeartbeatRequest request2 = NodeHeartbeatRequest.newInstance(status, null, null,null); resourceTracker.nodeHeartbeat(request2); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 49cced6..f7f6338 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1447,6 +1447,27 @@ + The extent of over-allocation (container-allocation based on + current utilization) allowed on this node, expressed as a float between + 0 and 0.95. By default, over-allocation is turned off (value = 0). When + turned on, the node allows running OPPORTUNISTIC containers when the + aggregate utilization is under the value specified here multiplied by + the node's advertised capacity. + + yarn.nodemanager.oversubscription.allocation-threshold + 0f + + + + When a node is over-allocated to improve utilization by + running OPPORTUNISTIC containers, this config captures the utilization + beyond which OPPORTUNISTIC containers should start getting preempted. + + yarn.nodemanager.oversubscription.preemption-threshold + 1 + + + This configuration setting determines the capabilities assigned to docker containers when they are launched. While these may not be case-sensitive from a docker perspective, it is best to keep these diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 836cd4b..805aa29 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -59,7 +59,8 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId, NodeHealthStatus nodeHealthStatus, ResourceUtilization containersUtilization, ResourceUtilization nodeUtilization, - List increasedContainers) { + List increasedContainers, + OverAllocationInfo overAllocationInfo) { NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); nodeStatus.setResponseId(responseId); nodeStatus.setNodeId(nodeId); @@ -69,12 +70,17 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId, nodeStatus.setContainersUtilization(containersUtilization); nodeStatus.setNodeUtilization(nodeUtilization); nodeStatus.setIncreasedContainers(increasedContainers); + nodeStatus.setOverAllocationInfo(overAllocationInfo); return nodeStatus; } public abstract NodeId getNodeId(); public abstract int getResponseId(); - + + public abstract OverAllocationInfo getOverAllocationInfo(); + public abstract void setOverAllocationInfo( + OverAllocationInfo overAllocationInfo); + public abstract List getContainersStatuses(); public abstract void setContainersStatuses( List containersStatuses); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java new file mode 100644 index 0000000..77952bf --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.api.records.impl.pb + .OverAllocationInfoPBImpl; + +/** + * Captures information on how aggressively the scheduler can over-allocate + * OPPORTUNISTIC containers on a node. This is node-specific, and is sent on + * the wire on each heartbeat. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class OverAllocationInfo { + public static OverAllocationInfo newInstance( + ResourceThresholds overAllocationThresholds) { + OverAllocationInfo info = new OverAllocationInfoPBImpl(); + info.setOverAllocationThreshold(overAllocationThresholds); + return info; + } + + public abstract ResourceThresholds getOverAllocationThresholds(); + + public abstract void setOverAllocationThreshold( + ResourceThresholds resourceThresholds); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java new file mode 100644 index 0000000..4639f00 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.api.records.impl.pb.ResourceThresholdsPBImpl; + +/** + * Captures resource thresholds to be used for over-allocation and preemption + * when oversubscription is turned on. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class ResourceThresholds { + public static ResourceThresholds newInstance(float threshold) { + ResourceThresholds thresholds = new ResourceThresholdsPBImpl(); + thresholds.setMemoryThreshold(threshold); + thresholds.setCpuThreshold(threshold); + return thresholds; + } + + public abstract float getMemoryThreshold(); + + public abstract float getCpuThreshold(); + + public abstract void setMemoryThreshold(float memoryThreshold); + + public abstract void setCpuThreshold(float cpuThreshold); +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8dd4832..e30defa 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -40,9 +40,11 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.OverAllocationInfoProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; public class NodeStatusPBImpl extends NodeStatus { NodeStatusProto proto = NodeStatusProto.getDefaultInstance(); @@ -50,6 +52,7 @@ boolean viaProto = false; private NodeId nodeId = null; + private OverAllocationInfo overAllocationInfo = null; private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; @@ -75,6 +78,10 @@ private synchronized void mergeLocalToBuilder() { if (this.nodeId != null) { builder.setNodeId(convertToProtoFormat(this.nodeId)); } + if (this.overAllocationInfo != null) { + builder.setOverAllocationInfo( + convertToProtoFormat(this.overAllocationInfo)); + } if (this.containers != null) { addContainersToProto(); } @@ -224,6 +231,29 @@ public synchronized int getResponseId() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; return p.getResponseId(); } + + @Override + public OverAllocationInfo getOverAllocationInfo() { + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + if (this.overAllocationInfo != null) { + return this.overAllocationInfo; + } + if (!p.hasOverAllocationInfo()) { + return null; + } + this.overAllocationInfo = convertFromProtoFormat(p.getOverAllocationInfo()); + return this.overAllocationInfo; + } + + @Override + public void setOverAllocationInfo(OverAllocationInfo overAllocationInfo) { + maybeInitBuilder(); + if (this.overAllocationInfo == null) { + builder.clearOverAllocationInfo(); + } + this.overAllocationInfo = overAllocationInfo; + } + @Override public synchronized void setResponseId(int responseId) { maybeInitBuilder(); @@ -403,11 +433,21 @@ public synchronized void setIncreasedContainers( private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } - + private NodeId convertFromProtoFormat(NodeIdProto proto) { return new NodeIdPBImpl(proto); } + private static OverAllocationInfoProto convertToProtoFormat( + OverAllocationInfo overAllocationInfo) { + return ((OverAllocationInfoPBImpl)overAllocationInfo).getProto(); + } + + private static OverAllocationInfo convertFromProtoFormat( + OverAllocationInfoProto proto) { + return new OverAllocationInfoPBImpl(proto); + } + private NodeHealthStatusProto convertToProtoFormat( NodeHealthStatus healthStatus) { return ((NodeHealthStatusPBImpl) healthStatus).getProto(); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java new file mode 100644 index 0000000..8731733 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.OverAllocationInfoProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.OverAllocationInfoProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceThresholdsProto; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; + +public class OverAllocationInfoPBImpl extends OverAllocationInfo { + OverAllocationInfoProto proto = OverAllocationInfoProto.getDefaultInstance(); + OverAllocationInfoProto.Builder builder = null; + boolean viaProto = false; + + private ResourceThresholds overAllocationThresholds = null; + + public OverAllocationInfoPBImpl() { + builder = OverAllocationInfoProto.newBuilder(); + } + + public OverAllocationInfoPBImpl(OverAllocationInfoProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized OverAllocationInfoProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + if (overAllocationThresholds != null) { + builder.setOverAllocationThresholds( + convertToProtoFormat(overAllocationThresholds)); + } + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = OverAllocationInfoProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public synchronized ResourceThresholds getOverAllocationThresholds() { + OverAllocationInfoProtoOrBuilder p = viaProto ? proto : builder; + if (overAllocationThresholds != null) { + return overAllocationThresholds; + } + if (!p.hasOverAllocationThresholds()) { + return null; + } + overAllocationThresholds = + convertFromProtoFormat(p.getOverAllocationThresholds()); + return overAllocationThresholds; + } + + @Override + public synchronized void setOverAllocationThreshold( + ResourceThresholds resourceThresholds) { + maybeInitBuilder(); + if (this.overAllocationThresholds != null) { + builder.clearOverAllocationThresholds(); + } + this.overAllocationThresholds = resourceThresholds; + } + + private static ResourceThresholdsProto convertToProtoFormat( + ResourceThresholds overAllocationThresholds) { + return ((ResourceThresholdsPBImpl) overAllocationThresholds).getProto(); + } + + private static ResourceThresholds convertFromProtoFormat( + ResourceThresholdsProto overAllocationThresholdsProto) { + return new ResourceThresholdsPBImpl(overAllocationThresholdsProto); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java new file mode 100644 index 0000000..4851895 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceThresholdsProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ResourceThresholdsProtoOrBuilder; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; + +public class ResourceThresholdsPBImpl extends ResourceThresholds{ + ResourceThresholdsProto proto = ResourceThresholdsProto.getDefaultInstance(); + ResourceThresholdsProto.Builder builder = null; + boolean viaProto = false; + + public ResourceThresholdsPBImpl() { + builder = ResourceThresholdsProto.newBuilder(); + } + + public ResourceThresholdsPBImpl(ResourceThresholdsProto proto) { + this.proto = proto; + viaProto = true; + } + + public synchronized ResourceThresholdsProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private synchronized void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private synchronized void mergeLocalToBuilder() { + /* + * Right now, we have only memory and cpu thresholds that are floats. + * This is a no-op until we add other non-static fields to the proto. + */ + } + + private synchronized void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ResourceThresholdsProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public synchronized float getMemoryThreshold() { + ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder; + return p.getMemory(); + } + + @Override + public synchronized float getCpuThreshold() { + ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder; + return p.getCpu(); + } + + @Override + public synchronized void setMemoryThreshold(float memoryThreshold) { + maybeInitBuilder(); + builder.setMemory(memoryThreshold); + } + + @Override + public synchronized void setCpuThreshold(float cpuThreshold) { + maybeInitBuilder(); + builder.setCpu(cpuThreshold); + } +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 77064a0..30ef97f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -39,6 +39,7 @@ message NodeStatusProto { optional ResourceUtilizationProto containers_utilization = 6; optional ResourceUtilizationProto node_utilization = 7; repeated ContainerProto increased_containers = 8; + optional OverAllocationInfoProto over_allocation_info = 9; } message MasterKeyProto { @@ -57,3 +58,11 @@ message VersionProto { optional int32 minor_version = 2; } +message OverAllocationInfoProto { + optional ResourceThresholdsProto over_allocation_thresholds = 1; +} + +message ResourceThresholdsProto { + optional float memory = 1 [default = 0]; + optional float cpu = 2 [default = 0]; +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 9c2d1fb..2c460d2 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; @@ -87,4 +88,8 @@ ConcurrentLinkedQueue getLogAggregationStatusForApps(); + + boolean isOverSubscriptionEnabled(); + + OverAllocationInfo getOverAllocationInfo(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index a9a5411..9efc831 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; @@ -458,6 +459,7 @@ public void run() { private boolean isDecommissioned = false; private final ConcurrentLinkedQueue logAggregationReportForApps; + private OverAllocationInfo overAllocationInfo; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, @@ -585,6 +587,20 @@ public void setSystemCrendentialsForApps( getLogAggregationStatusForApps() { return this.logAggregationReportForApps; } + + @Override + public boolean isOverSubscriptionEnabled() { + return getOverAllocationInfo() != null; + } + + @Override + public OverAllocationInfo getOverAllocationInfo() { + return this.overAllocationInfo; + } + + public void setOverAllocationInfo(OverAllocationInfo overAllocationInfo) { + this.overAllocationInfo = overAllocationInfo; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5806731..5646929 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -437,8 +437,9 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { = getIncreasedContainers(); NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, containersStatuses, - createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization, nodeUtilization, increasedContainers); + createKeepAliveApplicationList(), nodeHealthStatus, + containersUtilization, nodeUtilization, increasedContainers, + this.context.getOverAllocationInfo()); return nodeStatus; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 446e7a1..768059a9 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -35,8 +35,13 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; +import org.apache.hadoop.yarn.server.api.records.ResourceThresholds; +import org.apache.hadoop.yarn.server.api.records.impl.pb + .ResourceThresholdsPBImpl; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; @@ -83,6 +88,8 @@ private ResourceUtilization containersUtilization; + private ResourceThresholds oversubscriptionPreemptionThresholds; + private volatile boolean stopped = false; public ContainersMonitorImpl(ContainerExecutor exec, @@ -158,6 +165,14 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.info("Physical memory check enabled: " + pmemCheckEnabled); LOG.info("Virtual memory check enabled: " + vmemCheckEnabled); + initializeOverSubscription(conf); + if (context.isOverSubscriptionEnabled()) { + // Oversubscription is enabled + pmemCheckEnabled = true; + LOG.info("Force enabling physical memory checks because " + + "oversubscription is enabled"); + } + containersMonitorEnabled = isEnabled(); LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled); @@ -191,6 +206,25 @@ protected void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); } + private void initializeOverSubscription(Configuration conf) { + float overAllocationTreshold = conf.getFloat( + YarnConfiguration.NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD, + YarnConfiguration.DEFAULT_NM_OVERSUBSCRIPTION_ALLOCATION_THRESHOLD); + + if (overAllocationTreshold > 0f) { + ((NodeManager.NMContext) context).setOverAllocationInfo( + OverAllocationInfo.newInstance( + ResourceThresholds.newInstance(overAllocationTreshold))); + + float preemptionThreshold = conf.getFloat( + YarnConfiguration.NM_OVERSUBSCRIPTION_PREEMPTION_THRESHOLD, + YarnConfiguration.DEFAULT_NM_OVERSUBSCRIPTION_PREEMPTION_THRESHOLD); + + this.oversubscriptionPreemptionThresholds = + ResourceThresholds.newInstance(preemptionThreshold); + } + } + private boolean isEnabled() { if (resourceCalculatorPlugin == null) { LOG.info("ResourceCalculatorPlugin is unavailable on this system. " diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 9bc23f6..b9bc259 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor; @@ -674,6 +675,16 @@ public void setDecommissioned(boolean isDecommissioned) { } @Override + public boolean isOverSubscriptionEnabled() { + return false; + } + + @Override + public OverAllocationInfo getOverAllocationInfo() { + return null; + } + + @Override public NodeResourceMonitor getNodeResourceMonitor() { return null; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index d1c9f6e..931b54f 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -674,7 +674,7 @@ public void testUpdateHeartbeatResponseForAppLifeCycle() { NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true, "", System.currentTimeMillis()); NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, 0, statusList, null, - nodeHealth, null, null, null); + nodeHealth, null, null, null, null); node.handle(new RMNodeStatusEvent(nodeId, nodeStatus, null)); Assert.assertEquals(1, node.getRunningApps().size()); @@ -724,7 +724,7 @@ private RMNodeImpl getUnhealthyNode() { NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, - new ArrayList(), null, status, null, null, null); + new ArrayList(), null, status, null, null, null, null); node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); Assert.assertEquals(NodeState.UNHEALTHY, node.getState()); return node; @@ -922,7 +922,7 @@ public void testDecommissioningUnhealthy() { NodeHealthStatus status = NodeHealthStatus.newInstance(false, "sick", System.currentTimeMillis()); NodeStatus nodeStatus = NodeStatus.newInstance(node.getNodeID(), 0, - new ArrayList(), null, status, null, null, null); + new ArrayList(), null, status, null, null, null, null); node.handle(new RMNodeStatusEvent(node.getNodeID(), nodeStatus, null)); Assert.assertEquals(NodeState.DECOMMISSIONING, node.getState()); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java index 087199d..0f1fd92 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java @@ -166,7 +166,7 @@ public void testLogAggregationStatus() throws Exception { node1ReportForApp.add(report1); NodeStatus nodeStatus1 = NodeStatus.newInstance(node1.getNodeID(), 0, new ArrayList(), null, - NodeHealthStatus.newInstance(true, null, 0), null, null, null); + NodeHealthStatus.newInstance(true, null, 0), null, null, null, null); node1.handle(new RMNodeStatusEvent(node1.getNodeID(), nodeStatus1, null, node1ReportForApp)); @@ -180,7 +180,7 @@ public void testLogAggregationStatus() throws Exception { node2ReportForApp.add(report2); NodeStatus nodeStatus2 = NodeStatus.newInstance(node2.getNodeID(), 0, new ArrayList(), null, - NodeHealthStatus.newInstance(true, null, 0), null, null, null); + NodeHealthStatus.newInstance(true, null, 0), null, null, null, null); node2.handle(new RMNodeStatusEvent(node2.getNodeID(), nodeStatus2, null, node2ReportForApp)); // node1 and node2 has updated its log aggregation status diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java index 3fd1fd5..9d6ad5e 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java @@ -145,7 +145,8 @@ public void testNodesDefaultWithUnHealthyNode() throws JSONException, NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(false, "test health report", System.currentTimeMillis()); NodeStatus nodeStatus = NodeStatus.newInstance(nm3.getNodeId(), 1, - new ArrayList(), null, nodeHealth, null, null, null); + new ArrayList(), null, nodeHealth, null, null, null, + null); node.handle(new RMNodeStatusEvent(nm3.getNodeId(), nodeStatus, null)); rm.NMwaitForState(nm3.getNodeId(), NodeState.UNHEALTHY); @@ -657,7 +658,7 @@ public void testNodesResourceUtilization() throws JSONException, Exception { 2048, 0, (float) 5.05); NodeStatus nodeStatus = NodeStatus.newInstance(nm1.getNodeId(), 0, new ArrayList(), null, nodeHealth, containerResource, - nodeResource, null); + nodeResource, null, null); node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus, null)); rm.NMwaitForState(nm1.getNodeId(), NodeState.RUNNING); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java index cf83e67..2a8c580 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestMiniYarnClusterNodeUtilization.java @@ -190,6 +190,7 @@ private NodeStatus createNodeStatus( NodeHealthStatus.newInstance(true, null, 0), containersUtilization, nodeUtilization, + null, null); return status;