diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index 0b65e5c..7bceb6e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -231,4 +231,16 @@ public abstract void setIncreaseRequests( @Unstable public abstract void setDecreaseRequests( List decreaseRequests); + + /** + * This lets the RM know that the ApplicationMaster is running + * on a node where distributed scheduling has been enabled + */ + @Public + @Unstable + public abstract void setDistributedSchedulingEnabled(); + + @Public + @Unstable + public abstract boolean isDistributedSchedulingEnabled(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index d1b2a3a..4bd4787 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; @@ -328,4 +329,21 @@ public abstract void setDecreasedContainers( @Private @Unstable public abstract void setApplicationPriority(Priority priority); + + @Private + @Unstable + public abstract void setNewNodes(List newNodes); + + @Private + @Unstable + public abstract List getNewNodes(); + + @Private + @Unstable + public abstract void setRemovedNodes(List removedNodes); + + @Private + @Unstable + public abstract List getRemovedNodes(); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java index 395e190..0e913b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterRequest.java @@ -19,7 +19,9 @@ package org.apache.hadoop.yarn.api.protocolrecords; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.util.Records; @@ -132,4 +134,16 @@ public static RegisterApplicationMasterRequest newInstance(String host, @Public @Stable public abstract void setTrackingUrl(String trackingUrl); + + /** + * This lets the RM know that the ApplicationMaster is running + * on a node where distributed scheduling has been enabled + */ + @Public + @Unstable + public abstract void setDistributedSchedulingEnabled(); + + @Public + @Unstable + public abstract boolean isDistributedSchedulingEnabled(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 1a51ba6..1e9401d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.util.Records; @@ -202,4 +203,97 @@ public abstract void setContainersFromPreviousAttempts( @Unstable public abstract void setSchedulerResourceTypes( EnumSet types); + + /** + * Set/Get Minimum allocatable capability + * @param minResource + */ + @Private + @Unstable + public abstract void setMinAllocatableCapabilty(Resource minResource); + + @Private + @Unstable + public abstract Resource getMinAllocatableCapabilty(); + + /** + * Set/Get Maximum allocatable capability + * @param maxResource + */ + @Private + @Unstable + public abstract void setMaxAllocatableCapabilty(Resource maxResource); + + @Private + @Unstable + public abstract Resource getMaxAllocatableCapabilty(); + + /** + * Set/Get Maximum number of QUEUEABLE containers that can be allocated + * per node + * @param maxQueueablePerNode + */ + @Private + @Unstable + public abstract void setMaxQueueableContainersPerNode(int maxQueueablePerNode); + + @Private + @Unstable + public abstract int getMaxQueueableContainersPerNode(); + + /** + * Set/Get Maximum number of QUEUEABLE containers that can be allocated + * @param maxQueueable + */ + @Private + @Unstable + public abstract void setMaxQueueableContainers(int maxQueueable); + + @Private + @Unstable + public abstract int getMaxQueueableContainers(); + + /** + * Set/Get the Token expiry interval for all containers allocation via + * Distributed Scheduling + * @param interval + */ + public abstract void setContainerTokenExpiryInterval(int interval); + + @Private + @Unstable + public abstract int getContainerTokenExpiryInterval(); + + /** + * Set/Get whether Strictly Local requests are supported when Distributed + * Scheduling is switched on + * @param isLocalityEnabled + */ + public abstract void setLocalityEnabled(boolean isLocalityEnabled); + + @Private + @Unstable + public abstract boolean isLocalityEnabled(); + + /** + * Set/Get start of containerId counter for Containers allocated via + * the Distributed Scheduler + * @param containerIdStart + */ + public abstract void setContainerIdStart(long containerIdStart); + + @Private + @Unstable + public abstract long getContainerIdStart(); + + /** + * Set list of nodes on which Containers can be allocated via + * Distributed Scheduling + * @param nodeList + */ + public abstract void setNodeList(List nodeList); + + @Private + @Unstable + public abstract List getNodeList(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index f493fd3..f4ee474 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -282,6 +282,57 @@ private static void addDeprecatedKeys() { /** ACL used in case none is found. Allows nothing. */ public static final String DEFAULT_YARN_APP_ACL = " "; + /** Is Distributed Scheduling Enabled */ + public static String DIST_SCHEDULING_ENABLED = + YARN_PREFIX + "distributed-scheduling.enabled"; + public static final boolean DIST_SCHEDULING_ENABLED_DEFAULT = false; + + /** Mininum allocatable container memory for Distributed Scheduling */ + public static String DIST_SCHEDULING_MIN_MEMORY = + YARN_PREFIX + "distributed-scheduling.min-memory"; + public static final int DIST_SCHEDULING_MIN_MEMORY_DEFAULT = 512; + + /** Mininum allocatable container vcores for Distributed Scheduling */ + public static String DIST_SCHEDULING_MIN_VCORES = + YARN_PREFIX + "distributed-scheduling.min-vcores"; + public static final int DIST_SCHEDULING_MIN_VCORES_DEFAULT = 1; + + /** Maximum allocatable container memory for Distributed Scheduling */ + public static String DIST_SCHEDULING_MAX_MEMORY = + YARN_PREFIX + "distributed-scheduling.max-memory"; + public static final int DIST_SCHEDULING_MAX_MEMORY_DEFAULT = 2048; + + /** Maximum allocatable container vcores for Distributed Scheduling */ + public static String DIST_SCHEDULING_MAX_VCORES = + YARN_PREFIX + "distributed-scheduling.max-vcores"; + public static final int DIST_SCHEDULING_MAX_VCORES_DEFAULT = 4; + + /** Maximum number of Queueable container allowed per app for + * Distributed Scheduling */ + public static String DIST_SCHEDULING_MAX_QUEUEABLE_CONTAINERS = + YARN_PREFIX + "distributed-scheduling.max-queueable-containers"; + public static final int DIST_SCHEDULING_MAX_QUEUEABLE_CONTAINERS_DEFAULT = + 100; + + /** Maximum number of Queueable container allowed per app per node for + * Distributed Scheduling */ + public static String DIST_SCHEDULING_MAX_QUEUEABLE_CONTAINERS_PER_NODE = + YARN_PREFIX + "distributed-scheduling.max-queueable-containers-per-app"; + public static final + int DIST_SCHEDULING_MAX_QUEUEABLE_CONTAINERS_PER_NODE_DEFAULT = 100; + + /** Container token expiry for container allocated via + * Distributed Scheduling */ + public static String DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS = + YARN_PREFIX + "distributed-scheduling.container-token-expiry"; + public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT = + 600000; + + /** Allow Strict locality allocation for Distributed Scheduling */ + public static String DIST_SCHEDULING_LOCALITY_ENABLED = + YARN_PREFIX + "distributed-scheduling.container-token-expiry"; + public static final boolean DIST_SCHEDULING_LOCALITY_ENABLED_DEFAULT = false; + /** * Enable/disable intermediate-data encryption at YARN level. For now, this * only is used by the FileSystemRMStateStore to setup right file-system diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 8924eba..2506f07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -38,6 +38,7 @@ message RegisterApplicationMasterRequestProto { optional string host = 1; optional int32 rpc_port = 2; optional string tracking_url = 3; + optional bool distributed_scheduling_enabled = 4; } message RegisterApplicationMasterResponseProto { @@ -48,6 +49,17 @@ message RegisterApplicationMasterResponseProto { optional string queue = 5; repeated NMTokenProto nm_tokens_from_previous_attempts = 6; repeated SchedulerResourceTypes scheduler_resource_types = 7; + + // The rest will be sent only if Distributed Scheduling is enabled // + + optional ResourceProto max_alloc_capability = 8; + optional ResourceProto min_alloc_capability = 9; + optional int32 max_queueable_containers_per_node = 12; + optional int32 max_queueable_containers = 13; + optional int32 container_token_expiry_interval = 14; + optional bool is_locality_enabled = 15; + optional int64 container_id_start = 16; + repeated NodeIdProto node_list = 17; } message FinishApplicationMasterRequestProto { @@ -68,6 +80,7 @@ message AllocateRequestProto { optional float progress = 5; repeated ContainerResourceChangeRequestProto increase_request = 6; repeated ContainerResourceChangeRequestProto decrease_request = 7; + optional bool distributed_scheduling_enabled = 8; } message NMTokenProto { @@ -89,6 +102,11 @@ message AllocateResponseProto { repeated ContainerProto decreased_containers = 11; optional hadoop.common.TokenProto am_rm_token = 12; optional PriorityProto application_priority = 13; + + // The rest will be sent only if Distributed Scheduling is enabled // + + repeated NodeIdProto new_nodes = 16; + repeated NodeIdProto removed_nodes = 17; } enum SchedulerResourceTypes { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index d6db32c..ad585a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -233,6 +233,21 @@ private void initAsks() { this.ask.add(convertFromProtoFormat(c)); } } + + @Override + public void setDistributedSchedulingEnabled() { + maybeInitBuilder(); + builder.setDistributedSchedulingEnabled(true); + } + + @Override + public boolean isDistributedSchedulingEnabled() { + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDistributedSchedulingEnabled()) { + return false; + } + return p.getDistributedSchedulingEnabled(); + } private void addAsksToProto() { maybeInitBuilder(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index da87465..ae51e06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; @@ -39,12 +40,14 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; @@ -77,6 +80,9 @@ private Token amrmToken = null; private Priority appPriority = null; + private List newNodes = null; + private List removedNodes = null; + public AllocateResponsePBImpl() { builder = AllocateResponseProto.newBuilder(); } @@ -125,6 +131,18 @@ private synchronized void mergeLocalToBuilder() { Iterable iterable = getTokenProtoIterable(nmTokens); builder.addAllNmTokens(iterable); } + if (newNodes != null) { + builder.clearNewNodes(); + Iterable iterable = + getNodeIdProtoIterable(newNodes); + builder.addAllNewNodes(iterable); + } + if (removedNodes != null) { + builder.clearNewNodes(); + Iterable iterable = + getNodeIdProtoIterable(removedNodes); + builder.addAllNewNodes(iterable); + } if (this.completedContainersStatuses != null) { builder.clearCompletedContainerStatuses(); Iterable iterable = @@ -342,6 +360,99 @@ public synchronized void setNumClusterNodes(int numNodes) { } @Override + public void setNewNodes(List newNodes) { + maybeInitBuilder(); + if (newNodes == null || newNodes.isEmpty()) { + if (this.newNodes != null) { + this.newNodes.clear(); + } + builder.clearNewNodes(); + return; + } + this.newNodes = new ArrayList<>(); + this.newNodes.addAll(newNodes); + } + + @Override + public List getNewNodes() { + if (newNodes != null) { + return newNodes; + } + initLocalNewNodeList(); + return newNodes; + } + + private synchronized void initLocalNewNodeList() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNewNodesList(); + newNodes = new ArrayList<>(); + for (YarnProtos.NodeIdProto t : list) { + newNodes.add(convertFromProtoFormat(t)); + } + } + + @Override + public void setRemovedNodes(List removedNodes) { + maybeInitBuilder(); + if (removedNodes == null || removedNodes.isEmpty()) { + if (this.removedNodes != null) { + this.removedNodes.clear(); + } + builder.clearRemovedNodes(); + return; + } + this.removedNodes = new ArrayList<>(); + this.removedNodes.addAll(removedNodes); + } + + @Override + public List getRemovedNodes() { + if (removedNodes != null) { + return removedNodes; + } + initLocalRemovedNodeList(); + return removedNodes; + } + + private synchronized void initLocalRemovedNodeList() { + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getRemovedNodesList(); + removedNodes = new ArrayList<>(); + for (YarnProtos.NodeIdProto t : list) { + removedNodes.add(convertFromProtoFormat(t)); + } + } + + private synchronized Iterable getNodeIdProtoIterable( + final List nodeList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = nodeList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public YarnProtos.NodeIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + + @Override public synchronized PreemptionMessage getPreemptionMessage() { AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; if (this.preempt != null) { @@ -680,4 +791,12 @@ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { private PriorityProto convertToProtoFormat(Priority t) { return ((PriorityPBImpl)t).getProto(); } + + private YarnProtos.NodeIdProto convertToProtoFormat(NodeId nodeId) { + return ((NodeIdPBImpl) nodeId).getProto(); + } + + private NodeId convertFromProtoFormat(YarnProtos.NodeIdProto proto) { + return new NodeIdPBImpl(proto); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java index 037dfd9..73766e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterRequestPBImpl.java @@ -131,4 +131,19 @@ public void setTrackingUrl(String url) { } builder.setTrackingUrl(url); } -} + + @Override + public void setDistributedSchedulingEnabled() { + maybeInitBuilder(); + builder.setDistributedSchedulingEnabled(true); + } + + @Override + public boolean isDistributedSchedulingEnabled() { + RegisterApplicationMasterRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasDistributedSchedulingEnabled()) { + return false; + } + return p.getDistributedSchedulingEnabled(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index a95aadf..a2d8906 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -25,14 +25,20 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; + + + import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; @@ -60,6 +66,10 @@ private List nmTokens = null; private EnumSet schedulerResourceTypes = null; + private Resource maxAllocatableCapability; + private Resource minAllocatableCapability; + private List nodeList; + public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); } @@ -120,6 +130,12 @@ private void mergeLocalToBuilder() { Iterable iterable = getTokenProtoIterable(nmTokens); builder.addAllNmTokensFromPreviousAttempts(iterable); } + if (nodeList != null) { + builder.clearNodeList(); + Iterable iterable = + getNodeIdProtoIterable(nodeList); + builder.addAllNodeList(iterable); + } if(schedulerResourceTypes != null) { addSchedulerResourceTypes(); } @@ -433,6 +449,190 @@ public void setSchedulerResourceTypes(EnumSet types) { this.schedulerResourceTypes.addAll(types); } + @Override + public void setMinAllocatableCapabilty(Resource minResource) { + maybeInitBuilder(); + if(minAllocatableCapability == null) { + builder.clearMinAllocCapability(); + } + this.minAllocatableCapability = minResource; + } + + @Override + public Resource getMinAllocatableCapabilty() { + if (this.minAllocatableCapability != null) { + return this.minAllocatableCapability; + } + + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMinAllocCapability()) { + return null; + } + + this.minAllocatableCapability = convertFromProtoFormat(p.getMinAllocCapability()); + return this.minAllocatableCapability; + } + + @Override + public void setMaxAllocatableCapabilty(Resource maxResource) { + maybeInitBuilder(); + if(maxAllocatableCapability == null) { + builder.clearMaxAllocCapability(); + } + this.maxAllocatableCapability = maxResource; + } + + @Override + public Resource getMaxAllocatableCapabilty() { + if (this.maxAllocatableCapability != null) { + return this.maxAllocatableCapability; + } + + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMaxAllocCapability()) { + return null; + } + + this.maxAllocatableCapability = convertFromProtoFormat(p.getMaxAllocCapability()); + return this.maxAllocatableCapability; + } + + @Override + public void setMaxQueueableContainersPerNode(int maxQueueablePerNode) { + maybeInitBuilder(); + builder.setMaxQueueableContainersPerNode(maxQueueablePerNode); + } + + @Override + public int getMaxQueueableContainersPerNode() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMaxQueueableContainersPerNode()) { + return 0; + } + return p.getMaxQueueableContainersPerNode(); + } + + @Override + public void setMaxQueueableContainers(int maxQueueable) { + maybeInitBuilder(); + builder.setMaxQueueableContainers(maxQueueable); + } + + @Override + public int getMaxQueueableContainers() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMaxQueueableContainers()) { + return 0; + } + return p.getMaxQueueableContainers(); + } + + @Override + public void setContainerTokenExpiryInterval(int interval) { + maybeInitBuilder(); + builder.setContainerTokenExpiryInterval(interval); + } + + @Override + public int getContainerTokenExpiryInterval() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerTokenExpiryInterval()) { + return 0; + } + return p.getContainerTokenExpiryInterval(); + } + + @Override + public void setLocalityEnabled(boolean isLocalityEnabled) { + maybeInitBuilder(); + builder.setIsLocalityEnabled(isLocalityEnabled); + } + + @Override + public boolean isLocalityEnabled() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasIsLocalityEnabled()) { + return false; + } + return p.getIsLocalityEnabled(); + } + + @Override + public void setContainerIdStart(long containerIdStart) { + maybeInitBuilder(); + builder.setContainerIdStart(containerIdStart); + } + + @Override + public long getContainerIdStart() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasContainerIdStart()) { + return 0; + } + return p.getContainerIdStart(); + } + + @Override + public void setNodeList(List nodeList) { + maybeInitBuilder(); + if (nodeList == null || nodeList.isEmpty()) { + if (this.nodeList != null) { + this.nodeList.clear(); + } + builder.clearNodeList(); + return; + } + this.nodeList = new ArrayList<>(); + this.nodeList.addAll(nodeList); + } + + private synchronized void initLocalNewNodeList() { + RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getNodeListList(); + nodeList = new ArrayList<>(); + for (YarnProtos.NodeIdProto t : list) { + nodeList.add(convertFromProtoFormat(t)); + } + } + + @Override + public List getNodeList() { + if (nodeList != null) { + return nodeList; + } + initLocalNewNodeList(); + return nodeList; + } + + private synchronized Iterable getNodeIdProtoIterable( + final List nodeList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = nodeList.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public YarnProtos.NodeIdProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + } + private Resource convertFromProtoFormat(ResourceProto resource) { return new ResourcePBImpl(resource); } @@ -456,4 +656,12 @@ private NMTokenProto convertToProtoFormat(NMToken token) { private NMToken convertFromProtoFormat(NMTokenProto proto) { return new NMTokenPBImpl(proto); } + + private YarnProtos.NodeIdProto convertToProtoFormat(NodeId nodeId) { + return ((NodeIdPBImpl) nodeId).getProto(); + } + + private NodeId convertFromProtoFormat(YarnProtos.NodeIdProto proto) { + return new NodeIdPBImpl(proto); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index f8a1320..3449908 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -78,4 +79,5 @@ void setSystemCredentialsForApps( List getContainersToDecrease(); void addAllContainersToDecrease(Collection containersToDecrease); + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java index 759b6f2..baa79f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java @@ -148,4 +148,5 @@ public static Token newInstance(byte[] password, .buildTokenService(addr).toString()); return nmToken; } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 9c2d1fb..a4211bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.ContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -87,4 +88,8 @@ ConcurrentLinkedQueue getLogAggregationStatusForApps(); + + boolean isDistributedSchedulingEnabled(); + + ContainerAllocator getContainerAllocator(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 04e383f..7aa7df6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.ContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer; @@ -187,9 +188,9 @@ protected DeletionService createDeletionService(ContainerExecutor exec) { protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, - NMStateStoreService stateStore) { + NMStateStoreService stateStore, boolean isDistSchedulerEnabled) { return new NMContext(containerTokenSecretManager, nmTokenSecretManager, - dirsHandler, aclsManager, stateStore); + dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled); } protected void doSecureLogin() throws IOException { @@ -310,8 +311,12 @@ protected void serviceInit(Configuration conf) throws Exception { getNodeHealthScriptRunner(conf), dirsHandler); addService(nodeHealthChecker); + boolean isDistSchedulingEnabled = + conf.getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT); + this.context = createNMContext(containerTokenSecretManager, - nmTokenSecretManager, nmStore); + nmTokenSecretManager, nmStore, isDistSchedulingEnabled); nodeLabelsProvider = createNodeLabelsProvider(conf); @@ -340,6 +345,9 @@ protected void serviceInit(Configuration conf) throws Exception { addService(webServer); ((NMContext) context).setWebServer(webServer); + ((NMContext) context).setContainerAllocator(new ContainerAllocator( + nodeStatusUpdater, context, webServer.getPort())); + dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(NodeManagerEventType.class, this); addService(dispatcher); @@ -461,11 +469,14 @@ public void run() { private boolean isDecommissioned = false; private final ConcurrentLinkedQueue logAggregationReportForApps; + private final boolean isDistSchedulingEnabled; + + private ContainerAllocator containerAllocator; public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager, - NMStateStoreService stateStore) { + NMStateStoreService stateStore, boolean isDistSchedulingEnabled) { this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; this.dirsHandler = dirsHandler; @@ -476,6 +487,7 @@ public NMContext(NMContainerTokenSecretManager containerTokenSecretManager, this.stateStore = stateStore; this.logAggregationReportForApps = new ConcurrentLinkedQueue< LogAggregationReport>(); + this.isDistSchedulingEnabled = isDistSchedulingEnabled; } /** @@ -588,6 +600,20 @@ public void setSystemCrendentialsForApps( getLogAggregationStatusForApps() { return this.logAggregationReportForApps; } + + @Override + public boolean isDistributedSchedulingEnabled() { + return isDistSchedulingEnabled; + } + + public void setContainerAllocator(ContainerAllocator containerAllocator) { + this.containerAllocator = containerAllocator; + } + + @Override + public ContainerAllocator getContainerAllocator() { + return containerAllocator; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java index bd6538c..079f8bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/AMRMProxyService.java @@ -63,6 +63,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; + +import org.apache.hadoop.yarn.server.nodemanager.scheduler + .DistSchedulerRequestInterceptor; import org.apache.hadoop.yarn.server.security.MasterKeyData; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; @@ -456,6 +459,11 @@ protected RequestInterceptor createRequestInterceptorChain() { conf.get( YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE, YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE); + if (this.nmContext.isDistributedSchedulingEnabled()) { + configuredInterceptorClassNames = + DistSchedulerRequestInterceptor.class.getName() + "," + + configuredInterceptorClassNames; + } List interceptorClassNames = new ArrayList(); Collection tempList = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/ContainerAllocator.java new file mode 100644 index 0000000..5728274 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/ContainerAllocator.java @@ -0,0 +1,262 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.ContainerType; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; + +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; + +public class ContainerAllocator { + + private static final Log LOG = + LogFactory.getLog(ContainerAllocator.class); + + static class ContainerIdCounter { + final AtomicLong containerIdCounter = new AtomicLong(1); + + void resetContainerIdCounter(long containerIdStart) { + this.containerIdCounter.set(containerIdStart % 2 == 0 ? + containerIdStart + 1 : containerIdStart); + } + + long generateContainerId() { + return this.containerIdCounter.getAndAdd(2); + } + } + + private final NodeStatusUpdater nodeStatusUpdater; + private final Context context; + private int webpagePort; + + public ContainerAllocator(NodeStatusUpdater nodeStatusUpdater, + Context context, int webpagePort) { + this.nodeStatusUpdater = nodeStatusUpdater; + this.context = context; + this.webpagePort = webpagePort; + } + + public Map> allocate(DistSchedulerParams appParams, + ContainerIdCounter idCounter, Collection resourceAsks, + Set blacklist, ApplicationAttemptId appAttId, + Map allNodes, String userName, + int allocationLimit) throws YarnException { + Map> containers = new HashMap<>(); + if (allocationLimit == 0) + return containers; + Set nodesAllocated = new HashSet<>(); + // filter the requests + // TODO (ARUN): Check if this is really whats expected + List anyAsks = new ArrayList<>(); + int totalAllocated = 0; + for (ResourceRequest rr : resourceAsks) { + if (ResourceRequest.isAnyLocation(rr.getResourceName()) + || rr.getResourceName().contains(NetworkTopology.DEFAULT_RACK)) { + anyAsks.add(rr); + resourceAsks.remove(rr); + } + } + if (appParams.isLocalityEnabled()) { + totalAllocated = allocateStrictlyLocalContainers(appParams, idCounter, + resourceAsks, appAttId, allNodes, userName, allocationLimit, + containers, nodesAllocated, totalAllocated); + } + // Anything left over...assign on the best nodes by biasing away from what + // has already been allocated. + for (ResourceRequest anyAsk : anyAsks) { + allocateNonStrictlyLocalContainers(appParams, idCounter, blacklist, + appAttId, allNodes, userName, allocationLimit, + containers, nodesAllocated, totalAllocated, anyAsk); + } + if (resourceAsks.size() > 0) { + LOG.info("Queueable allocation requested for: " + resourceAsks.size() + + " containers; allocated = " + containers.size()); + } + return containers; + } + + private void allocateNonStrictlyLocalContainers(DistSchedulerParams appParams, + ContainerIdCounter idCounter, Set blacklist, + ApplicationAttemptId id, Map allNodes, String userName, + int allocationLimit, Map> containers, + Set nodesAllocated, int totalAllocated, ResourceRequest anyAsk) + throws YarnException { + int leftOver = Math.min( + anyAsk.getNumContainers() - containers.size(), + allocationLimit - totalAllocated); + + List topKNodes = getTopKNodes(allNodes.keySet()); + List topKNodesLeft = new ArrayList<>(); + for (String s : topKNodes) { + // Bias away from whatever we have already allocated and respect blacklist + if (nodesAllocated.contains(s) || blacklist.contains(s)) + continue; + topKNodesLeft.add(s); + } + leftOver = Math.min( + leftOver, + topKNodesLeft.size() * appParams.getMaxQueueableContainersPerNode()); + if (leftOver > 0) { + int numAllocated = 0; + int nextNodeToAllocate = 0; + for (int numCont = 0; numCont < Math.min(leftOver, + appParams.getMaxQueueableContainers()); numCont++) { + String topNode = topKNodesLeft.get(nextNodeToAllocate); + nextNodeToAllocate++; + nextNodeToAllocate %= topKNodesLeft.size(); + NodeId nodeId = allNodes.get(topNode); + Container container = + buildContainer(appParams, idCounter, anyAsk, id, userName, nodeId); + List cList = containers.get(anyAsk.getCapability()); + if (cList == null) { + cList = new ArrayList<>(); + containers.put(anyAsk.getCapability(), cList); + } + cList.add(container); + numAllocated++; + LOG.info("# of containers left over for any = " + leftOver + + " ; allocated = " + numAllocated); + } + } + } + + private int allocateStrictlyLocalContainers(DistSchedulerParams appParams, + ContainerIdCounter idCounter, Collection resourceAsks, + ApplicationAttemptId id, Map allNodes, String userName, + int allocationLimit, Map> containers, + Set nodesAllocated, int totalAllocated) throws YarnException { + // Assign all the host-specific requests + for (ResourceRequest rr : resourceAsks) { + if (rr.getNumContainers() == 0) + continue; + NodeId nodeId = allNodes.get(rr.getResourceName()); + if (nodeId == null) { + LOG.debug("Dont' have token for node: " + rr.getResourceName()); + continue; + } + int numAllocated = 0; + int numAllocatable = + Math.min( + Math.min( + rr.getNumContainers(), + appParams.getMaxQueueableContainersPerNode()), + allocationLimit - totalAllocated); + for (int numCont = 0; numCont < numAllocatable; numCont++) { + Container container = + buildContainer(appParams, idCounter, rr, id, userName, nodeId); + List cList = containers.get(rr.getCapability()); + if (cList == null) { + cList = new ArrayList<>(); + containers.put(rr.getCapability(), cList); + } + cList.add(container); + numAllocated++; + totalAllocated++; + LOG.info("# of containers requested on: " + rr.getResourceName() + + " = " + rr.getNumContainers() + " ; allocated = " + numAllocated); + if (rr.getNumContainers() > + appParams.getMaxQueueableContainersPerNode()) { + // To bias away from this node when doing "any" allocation below... + nodesAllocated.add(rr.getResourceName()); + } + } + } + return totalAllocated; + } + + private Container buildContainer(DistSchedulerParams appParams, + ContainerIdCounter idCounter, ResourceRequest rr, ApplicationAttemptId id, + String userName, NodeId nodeId) throws YarnException { + ContainerId cId = + ContainerId.newContainerId(id, idCounter.generateContainerId()); + rr.getCapability().setMemory( + fixMemory(appParams, rr.getCapability().getMemory())); + rr.getCapability().setVirtualCores( + fixVCores(appParams, rr.getCapability().getVirtualCores())); + long currTime = System.currentTimeMillis(); + ContainerTokenIdentifier containerTokenIdentifier = + new ContainerTokenIdentifier( + cId, nodeId.getHost(), userName, rr.getCapability(), + currTime + appParams.getContainerTokenExpiryInterval(), + context.getContainerTokenSecretManager().getCurrentKey().getKeyId(), + nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime, + null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK, + ExecutionType.QUEUEABLE); + byte[] pwd = + context.getContainerTokenSecretManager().createPassword( + containerTokenIdentifier); + Token containerToken = newContainerToken(nodeId, pwd, + containerTokenIdentifier); + Container container = BuilderUtils.newContainer( + cId, nodeId, nodeId.getHost() + ":" + webpagePort, + rr.getCapability(), rr.getPriority(), containerToken); + return container; + } + + // #!# Round to the next multiple of container dimension + private int fixMemory(DistSchedulerParams appParams, int memory) + throws YarnException { + if (memory > appParams.getMaxAllocMb()) { + return appParams.getMaxAllocMb(); + } + return appParams.getMinAllocMb() * + ((int) Math.ceil((float) memory / appParams.getMinAllocMb())); + } + + // #!# Round to the select the right number of virtual core + private int fixVCores(DistSchedulerParams appParams, int vCores) + throws YarnException { + if (vCores > appParams.getMaxVCores()) { + return appParams.getMaxVCores(); + } + return vCores < appParams.getMinVCores() ? appParams.getMinVCores() : vCores; + } + + public static Token newContainerToken(NodeId nodeId, byte[] password, + ContainerTokenIdentifier tokenIdentifier) { + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddrForHost(nodeId.getHost(), + nodeId.getPort()); + // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token + Token containerToken = Token.newInstance(tokenIdentifier.getBytes(), + ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil + .buildTokenService(addr).toString()); + return containerToken; + } + + // TODO : Not implemented yet !!. + // TODO : The TopKNodes would be a subset of the list of available nodes. + // TODO : Possibly an ordered list of lightly loaded nodes + private List getTopKNodes(Collection allNodes) { + return new ArrayList<>(allNodes); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistSchedulerParams.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistSchedulerParams.java new file mode 100644 index 0000000..95b243e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistSchedulerParams.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.scheduler; + +public class DistSchedulerParams { + + private int maxAllocMb; + private int minAllocMb; + private int maxVCores; + private int minVCores; + private int maxQueueableContainersPerNode; + private int maxQueueableContainers; + private int containerTokenExpiryInterval; + private boolean isLocalityEnabled; + + public int getMaxAllocMb() { + return maxAllocMb; + } + + public void setMaxAllocMb(int maxAllocMb) { + this.maxAllocMb = maxAllocMb; + } + + public int getMinAllocMb() { + return minAllocMb; + } + + public void setMinAllocMb(int minAllocMb) { + this.minAllocMb = minAllocMb; + } + + public int getMaxVCores() { + return maxVCores; + } + + public void setMaxVCores(int maxVCores) { + this.maxVCores = maxVCores; + } + + public int getMinVCores() { + return minVCores; + } + + public void setMinVCores(int minVCores) { + this.minVCores = minVCores; + } + + public int getMaxQueueableContainersPerNode() { + return maxQueueableContainersPerNode; + } + + public void setMaxQueueableContainersPerNode(int + maxQueueableContainersPerNode) { + this.maxQueueableContainersPerNode = maxQueueableContainersPerNode; + } + + public int getMaxQueueableContainers() { + return maxQueueableContainers; + } + + public void setMaxQueueableContainers(int maxQueueableContainers) { + this.maxQueueableContainers = maxQueueableContainers; + } + + public int getContainerTokenExpiryInterval() { + return containerTokenExpiryInterval; + } + + public void setContainerTokenExpiryInterval(int + containerTokenExpiryInterval) { + this.containerTokenExpiryInterval = containerTokenExpiryInterval; + } + + public boolean isLocalityEnabled() { + return isLocalityEnabled; + } + + public void setLocalityEnabled(boolean localityEnabled) { + isLocalityEnabled = localityEnabled; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistSchedulerRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistSchedulerRequestInterceptor.java new file mode 100644 index 0000000..34a83b8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/DistSchedulerRequestInterceptor.java @@ -0,0 +1,324 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.scheduler; + +import org.apache.hadoop.yarn.api.protocolrecords.*; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AbstractRequestInterceptor; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.*; + +public class DistSchedulerRequestInterceptor extends + AbstractRequestInterceptor { + + class PartitionedResourceRequests { + private List guaranteed = new ArrayList<>(); + private List queueable = new ArrayList<>(); + public List getGuaranteed() { + return guaranteed; + } + public List getQueueable() { + return queueable; + } + } + + public static class PriorityComparator implements Comparator { + @Override + public int compare(Priority o1, Priority o2) { + return o1.getPriority() - o2.getPriority(); + } + } + + private static final Logger LOG = LoggerFactory + .getLogger(DistSchedulerRequestInterceptor.class); + + private int numAllocatedContainers; + private DistSchedulerParams appParams = new DistSchedulerParams(); + private final ContainerAllocator.ContainerIdCounter containerIdCounter = + new ContainerAllocator.ContainerIdCounter(); + private Map nodeList = new HashMap<>(); + private Map nodeTokens = new HashMap<>(); + final Set blacklist = new TreeSet<>(); + final Set priorities = new TreeSet<>(new PriorityComparator()); + final Map>> requests = + new HashMap<>(); + + @Override + public RegisterApplicationMasterResponse registerApplicationMaster + (RegisterApplicationMasterRequest request) throws YarnException, + IOException { + RegisterApplicationMasterResponse registerResponse = + getNextInterceptor().registerApplicationMaster(request); + updateParameters(registerResponse); + return registerResponse; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster + (FinishApplicationMasterRequest request) throws YarnException, + IOException { + return getNextInterceptor().finishApplicationMaster(request); + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + PartitionedResourceRequests partitionedAsks = partitionAskList(request + .getAskList()); + request.setAskList(partitionedAsks.getGuaranteed()); + List releasedContainers = request.getReleaseList(); + int numReleasedContainers = releasedContainers.size(); + if (numReleasedContainers > 0) { + LOG.info("AttemptID: " + getApplicationContext() + .getApplicationAttemptId() + " released: " + numReleasedContainers); + numAllocatedContainers -= numReleasedContainers; + } + + // Also, update black list + ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest(); + blacklist.removeAll(rbr.getBlacklistRemovals()); + blacklist.addAll(rbr.getBlacklistAdditions()); + + AllocateResponse response = getNextInterceptor().allocate(request); + updateParameters(response); + List nmTokens = response.getNMTokens(); + for (NMToken nmToken : nmTokens) { + nodeTokens.put(nmToken.getNodeId(), nmToken); + } + int numCompletedContainers = 0; + List completedContainers = + response.getCompletedContainersStatuses(); + + // Only account for queueable containers + for (ContainerStatus cs : completedContainers) { + if (cs.getExecutionType() == ExecutionType.QUEUEABLE) + numCompletedContainers++; + } + numAllocatedContainers -= numCompletedContainers; + updateResourceAsk(partitionedAsks.getQueueable()); + + List allocatedContainers = new ArrayList(); + for (Priority priority : priorities) { + if (getHeadRoom() <= 0) + break; + List ask = getResourceRequests(priority); + int count = 0; + for (ResourceRequest rr : ask) { + count += rr.getNumContainers(); + } + LOG.info("Head room for optimistic container allocation for AM " + + getApplicationContext().getApplicationAttemptId() + + " is = " + getHeadRoom() + + " # of requests at priority: " + priority + " = " + count); + + ContainerAllocator containerAllocator = getApplicationContext() + .getNMCotext().getContainerAllocator(); + for (Map reqMap : + requests.get(priority).values()) { + Map> allocated = + containerAllocator.allocate(this.appParams, containerIdCounter, + new ArrayList(reqMap.values()), blacklist, + getApplicationContext().getApplicationAttemptId(), nodeList, + getApplicationContext().getUser(), getHeadRoom()); + for (Map.Entry> e : allocated.entrySet()) { + updateAllocation(e.getKey(), e.getValue()); + allocatedContainers.addAll(e.getValue()); + } + } + } + updateResponseWithNMTokens(response, nmTokens, allocatedContainers); + return response; + } + + private void updateResponseWithNMTokens(AllocateResponse response, + List nmTokens, List allocatedContainers) { + List newTokens = new ArrayList<>(); + if (allocatedContainers.size() > 0) { + response.getAllocatedContainers().addAll(allocatedContainers); + for (Container alloc : allocatedContainers) { + if (!nodeTokens.containsKey(alloc.getNodeId())) { + newTokens.add(getApplicationContext().getNMCotext(). + getNMTokenSecretManager().generateNMToken( + getApplicationContext().getUser(), alloc)); + } + } + List retTokens = new ArrayList<>(nmTokens); + retTokens.addAll(newTokens); + response.setNMTokens(retTokens); + } + } + + private PartitionedResourceRequests partitionAskList(List + askList) { + PartitionedResourceRequests partitionedRequests = + new PartitionedResourceRequests(); + for (ResourceRequest rr : askList) { + boolean b = ExecutionType.QUEUEABLE == rr.getExecutionType() ? + partitionedRequests.getQueueable().add(rr) : + partitionedRequests.getGuaranteed().add(rr); + } + return partitionedRequests; + } + + public List getResourceRequests(Priority priority) { + List ret = new ArrayList(); + Map> asks = requests.get(priority); + if (asks != null) { + for (Map v : asks.values()) { + ret.addAll(v.values()); + } + } + return ret; + } + + public int getHeadRoom() { + return appParams.getMaxQueueableContainers() == -1 ? Integer.MAX_VALUE : + appParams.getMaxQueueableContainers() - this.numAllocatedContainers; + } + + private void updateParameters( + RegisterApplicationMasterResponse registerResponse) { + appParams.setMinAllocMb( + registerResponse.getMinAllocatableCapabilty().getMemory()); + appParams.setMaxAllocMb( + registerResponse.getMaxAllocatableCapabilty().getMemory()); + appParams.setMinVCores( + registerResponse.getMinAllocatableCapabilty().getVirtualCores()); + appParams.setMaxVCores( + registerResponse.getMaxAllocatableCapabilty().getVirtualCores()); + appParams.setLocalityEnabled(registerResponse.isLocalityEnabled()); + appParams.setContainerTokenExpiryInterval( + registerResponse.getContainerTokenExpiryInterval()); + appParams.setMaxQueueableContainers( + registerResponse.getMaxQueueableContainers()); + appParams.setMaxQueueableContainersPerNode( + registerResponse.getMaxQueueableContainersPerNode()); + + containerIdCounter.resetContainerIdCounter( + registerResponse.getContainerIdStart()); + addToNodeList(registerResponse.getNodeList()); + } + + private void updateParameters(AllocateResponse response) { + addToNodeList(response.getNewNodes()); + removeFromNodeList(response.getRemovedNodes()); + } + + + public void updateResourceAsk(List requests) { + // Update resource requests + for (ResourceRequest request : requests) { + // Handling locality for queueable tokens is optional; we rely on + // "anyAsk" to drive allocations + Priority priority = request.getPriority(); + String resourceName = request.getResourceName(); + + if (!ResourceRequest.isAnyLocation(request.getResourceName()) + && !appParams.isLocalityEnabled()) { + continue; + } + + if (request.getNumContainers() == 0) { + continue; + } + + Map> asks = + this.requests.get(priority); + if (asks == null) { + asks = new HashMap<>(); + this.requests.put(priority, asks); + this.priorities.add(priority); + } + + Map reqMap = asks.get(resourceName); + if (reqMap == null) { + reqMap = new HashMap<>(); + asks.put(resourceName, reqMap); + } + + ResourceRequest resourceRequest = reqMap.get(request.getCapability()); + if (resourceRequest == null) { + resourceRequest = request; + reqMap.put(request.getCapability(), request); + } else { + resourceRequest.setNumContainers( + resourceRequest.getNumContainers() + request.getNumContainers()); + } + if (ResourceRequest.isAnyLocation(request.getResourceName())) { + LOG.info("# of requests in ANY (at priority = " + priority + ", " + + "with capability = " + request.getCapability() + " ) : " + + resourceRequest.getNumContainers()); + } + } + } + + public void updateAllocation(Resource capability, + List allocatedContainers) { + numAllocatedContainers += allocatedContainers.size(); + for (Container c : allocatedContainers) { + Map> asks = this + .requests.get(c.getPriority()); + + if (asks == null) + continue; + + // Host specific accounting + removeFromReqMap(capability, asks.get(c.getNodeId().getHost())); + + // any ask accounting + removeFromReqMap(capability, asks.get(ResourceRequest.ANY)); + } + } + + private void removeFromReqMap(Resource capability, Map rrMap) { + if (rrMap != null) { + ResourceRequest rr = rrMap.get(capability); + if (rr != null) { + rr.setNumContainers(rr.getNumContainers() - 1); + if (rr.getNumContainers() == 0) + rrMap.remove(capability); + } + } + } + + private void setNodeList(List nodeList) { + this.nodeList.clear(); + addToNodeList(nodeList); + } + + private void addToNodeList(List nodes) { + for (NodeId n : nodes) { + this.nodeList.put(n.getHost(), n); + } + } + + private void removeFromNodeList(List nodes) { + for (NodeId n : nodes) { + this.nodeList.remove(n.getHost()); + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java index f6169e7..86cce35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java @@ -29,7 +29,10 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService; @@ -50,7 +53,7 @@ private final Map oldMasterKeys; private final Map> appToAppAttemptMap; private final NMStateStoreService stateStore; - private NodeId nodeId; + private NodeId nodeId; public NMTokenSecretManagerInNM() { this(new NMNullStateStoreService()); @@ -276,4 +279,23 @@ private void removeAppAttemptKey(ApplicationAttemptId attempt) { LOG.error("Unable to remove master key for application " + attempt, e); } } + + /** + * Used by the Distributed Scheduler framework to generate NMTokens + * @param applicationSubmitter + * @param container + * @return NMToken + */ + public NMToken generateNMToken( + String applicationSubmitter, Container container) { + this.readLock.lock(); + try { + Token token = + createNMToken(container.getId().getApplicationAttemptId(), + container.getNodeId(), applicationSubmitter); + return NMToken.newInstance(container.getNodeId(), token); + } finally { + this.readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java index 3dc62bc..6885804 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java @@ -80,7 +80,7 @@ public void testSuccessfulContainerLaunch() throws InterruptedException, Context context = new NMContext(new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, null, - new NMNullStateStoreService()) { + new NMNullStateStoreService(), false) { @Override public int getHttpPort() { return 1234; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 90804b8..d0d437c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -1565,7 +1565,7 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, protected NMContext createNMContext( NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager, - NMStateStoreService store) { + NMStateStoreService store, boolean isDistributedSchedulingEnabled) { return new MyNMContext(containerTokenSecretManager, nmTokenSecretManager); } @@ -1800,7 +1800,7 @@ public MyNMContext( NMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInNM nmTokenSecretManager) { super(containerTokenSecretManager, nmTokenSecretManager, null, null, - new NMNullStateStoreService()); + new NMNullStateStoreService(), false); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 9bc23f6..3578488 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; +import org.apache.hadoop.yarn.server.nodemanager.scheduler.ContainerAllocator; import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -678,5 +679,14 @@ public NodeResourceMonitor getNodeResourceMonitor() { return null; } + @Override + public boolean isDistributedSchedulingEnabled() { + return false; + } + + @Override + public ContainerAllocator getContainerAllocator() { + return null; + } } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java index c902fd5..d070bbe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java @@ -110,7 +110,7 @@ public BaseContainerManagerTest() throws UnsupportedFileSystemException { protected Configuration conf = new YarnConfiguration(); protected Context context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService()) { + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) { public int getHttpPort() { return HTTP_PORT; }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 2e014de..dfb7a1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -471,7 +471,7 @@ private NMContext createContext(Configuration conf, NMStateStoreService stateStore) { NMContext context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore){ + new ApplicationACLsManager(conf), stateStore, false){ public int getHttpPort() { return HTTP_PORT; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java index 0abae2b..6f3adb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java @@ -109,7 +109,7 @@ protected Context distContext = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService()) { + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) { public int getHttpPort() { return HTTP_PORT; }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java index 9e08b7f..c768df1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java @@ -81,7 +81,8 @@ public void testMinimumPerDirectoryFileLimit() { NMContext nmContext = new NMContext(new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), + false); ResourceLocalizationService service = new ResourceLocalizationService(null, null, null, null, nmContext); try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index 64d3d68..4c594e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -186,7 +186,7 @@ public void setup() throws IOException { conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); nmContext = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); } @After @@ -2365,7 +2365,7 @@ private ResourceLocalizationService createSpyService( NMContext nmContext = new NMContext(new NMContainerTokenSecretManager(conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore); + new ApplicationACLsManager(conf), stateStore, false); ResourceLocalizationService rawService = new ResourceLocalizationService(dispatcher, exec, delService, dirsHandler, nmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java index 84e42fc..6a72cc0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java @@ -96,7 +96,7 @@ public void testContainerLogDirs() throws IOException, YarnException { healthChecker.init(conf); LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler(); NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); // Add an application and the corresponding containers RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf); String user = "nobody"; @@ -136,7 +136,7 @@ public void testContainerLogDirs() throws IOException, YarnException { when(dirsHandlerForFullDisk.getLogDirsForRead()). thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()})); nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); nmContext.getApplications().put(appId, app); container.setState(ContainerState.RUNNING); nmContext.getContainers().put(container1, container); @@ -158,7 +158,7 @@ public void testContainerLogFile() throws IOException, YarnException { LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService(); dirsHandler.init(conf); NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler, - new ApplicationACLsManager(conf), new NMNullStateStoreService()); + new ApplicationACLsManager(conf), new NMNullStateStoreService(), false); // Add an application and the corresponding containers String user = "nobody"; long clusterTimeStamp = 1234; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java index e1845c7..39e8394 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java @@ -87,7 +87,7 @@ private NodeHealthCheckerService createNodeHealthCheckerService(Configuration co private int startNMWebAppServer(String webAddr) { Context nmContext = new NodeManager.NMContext(null, null, null, null, - null); + null, false); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { @@ -150,7 +150,7 @@ public void testNMWebAppWithEphemeralPort() throws IOException { @Test public void testNMWebApp() throws IOException, YarnException { Context nmContext = new NodeManager.NMContext(null, null, null, null, - null); + null, false); ResourceView resourceView = new ResourceView() { @Override public long getVmemAllocatedForContainers() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java index 1f5590c..2ac0956 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java @@ -111,7 +111,7 @@ protected void configureServlets() { healthChecker.init(conf); aclsManager = new ApplicationACLsManager(conf); nmContext = new NodeManager.NMContext(null, null, dirsHandler, - aclsManager, null); + aclsManager, null, false); NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042); ((NodeManager.NMContext)nmContext).setNodeId(nodeId); resourceView = new ResourceView() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java index e274abb..dfbcf06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java @@ -104,7 +104,7 @@ protected void configureServlets() { dirsHandler = healthChecker.getDiskHandler(); aclsManager = new ApplicationACLsManager(conf); nmContext = new NodeManager.NMContext(null, null, dirsHandler, - aclsManager, null); + aclsManager, null, false); NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999); ((NodeManager.NMContext)nmContext).setNodeId(nodeId); resourceView = new ResourceView() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java index 0ed56d3..efad825 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java @@ -132,7 +132,7 @@ public boolean isPmemCheckEnabled() { dirsHandler = healthChecker.getDiskHandler(); aclsManager = new ApplicationACLsManager(conf); nmContext = new NodeManager.NMContext(null, null, dirsHandler, - aclsManager, null) { + aclsManager, null, false) { public NodeId getNodeId() { return NodeId.newInstance("testhost.foo.com", 8042); }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index ab94175..3f9dd34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -86,7 +86,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; @@ -278,6 +282,57 @@ public RegisterApplicationMasterResponse registerApplicationMaster( response.setSchedulerResourceTypes(rScheduler .getSchedulingResourceTypes()); + // Ensure both the NM and RM support distributed scheduling + if (request.isDistributedSchedulingEnabled() + && + getConfig().getBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, + YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { + response.setMinAllocatableCapabilty( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY, + YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MIN_VCORES, + YarnConfiguration.DIST_SCHEDULING_MIN_VCORES_DEFAULT) + ) + ); + response.setMaxAllocatableCapabilty( + Resource.newInstance( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY, + YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY_DEFAULT), + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MAX_VCORES, + YarnConfiguration.DIST_SCHEDULING_MAX_VCORES_DEFAULT) + ) + ); + response.setMaxQueueableContainers( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_MAX_QUEUEABLE_CONTAINERS, + YarnConfiguration. + DIST_SCHEDULING_MAX_QUEUEABLE_CONTAINERS_DEFAULT)); + response.setMaxQueueableContainersPerNode( + getConfig().getInt( + YarnConfiguration. + DIST_SCHEDULING_MAX_QUEUEABLE_CONTAINERS_PER_NODE, + YarnConfiguration. + DIST_SCHEDULING_MAX_QUEUEABLE_CONTAINERS_PER_NODE_DEFAULT)); + response.setContainerTokenExpiryInterval( + getConfig().getInt( + YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS, + YarnConfiguration. + DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT)); + response.setLocalityEnabled( + getConfig().getBoolean( + YarnConfiguration.DIST_SCHEDULING_LOCALITY_ENABLED, + YarnConfiguration.DIST_SCHEDULING_LOCALITY_ENABLED_DEFAULT)); + response.setContainerIdStart( + this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); + response.setNodeList( + new ArrayList<>(this.rmContext.getRMNodes().keySet())); + } + return response; } } @@ -509,6 +564,14 @@ public AllocateResponse allocate(AllocateRequest request) request.getIncreaseRequests(), request.getDecreaseRequests()); } + SchedulerApplicationAttempt schedulerAppAttempt = ( + (AbstractYarnScheduler) this.rScheduler) + .getApplicationAttempt(appAttemptId); + List newNodes = schedulerAppAttempt + .getAppSchedulingInfo().getAndClearAddedNodes(); + List removedNodes = schedulerAppAttempt + .getAppSchedulingInfo().getAndClearRemovedNodes(); + if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { LOG.info("blacklist are updated in Scheduler." + "blacklistAdditions: " + blacklistAdditions + ", " + @@ -567,6 +630,10 @@ public AllocateResponse allocate(AllocateRequest request) allocateResponse.setApplicationPriority(app .getApplicationSubmissionContext().getPriority()); + // Set New Nodes and Removed Nodes + allocateResponse.setNewNodes(newNodes); + allocateResponse.setRemovedNodes(removedNodes); + // update AMRMToken if the token is rolled-up MasterKeyData nextMasterKey = this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 01a1c8f..4b476a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -120,6 +120,11 @@ */ public static final int SHUTDOWN_HOOK_PRIORITY = 30; + /** + * Used for generation of various ids. + */ + public static final int EPOCH_BIT_SHIFT = 40; + private static final Log LOG = LogFactory.getLog(ResourceManager.class); private static long clusterTimeStamp = System.currentTimeMillis(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index bd24b25..1d308d6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -72,6 +72,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; @@ -372,6 +378,18 @@ public RegisterNodeManagerResponse registerNodeManager( StringUtils.join(",", nodeLabels) + " } "); } + // For Distributed Scheduling, let all apps know that a new node is + // available for distributed scheduling + for (SchedulerApplication app : + ((AbstractYarnScheduler) this.rmContext.getScheduler()) + .getAllApplications()) { + SchedulerApplicationAttempt currentAppAttempt = app + .getCurrentAppAttempt(); + if (currentAppAttempt != null) { + currentAppAttempt.getAppSchedulingInfo().nodeAdded(rmNode + .getNodeID()); + } + } LOG.info(message.toString()); response.setNodeAction(NodeAction.NORMAL); response.setRMIdentifier(ResourceManager.getClusterTimeStamp()); @@ -517,6 +535,18 @@ public UnRegisterNodeManagerResponse unRegisterNodeManager( this.nmLivelinessMonitor.unregister(nodeId); this.rmContext.getDispatcher().getEventHandler() .handle(new RMNodeEvent(nodeId, RMNodeEventType.SHUTDOWN)); + // For Distributed Scheduling, let all apps know that a node is + // no longer available for distributed scheduling + for (SchedulerApplication app : + ((AbstractYarnScheduler) this.rmContext.getScheduler()) + .getAllApplications()) { + SchedulerApplicationAttempt currentAppAttempt = app + .getCurrentAppAttempt(); + if (currentAppAttempt != null) { + currentAppAttempt.getAppSchedulingInfo().nodeRemoved(rmNode + .getNodeID()); + } + } return response; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 146b0d3..772393e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -67,6 +67,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -965,6 +972,19 @@ public static void deactivateNode(RMNodeImpl rmNode, NodeState finalState) { reportNodeUnusable(rmNode, finalState); + // For Distributed Scheduling, let all apps know that a node is + // no longer available for distributed scheduling + for (SchedulerApplication app : + ((AbstractYarnScheduler) rmNode.context.getScheduler()) + .getAllApplications()) { + SchedulerApplicationAttempt currentAppAttempt = app + .getCurrentAppAttempt(); + if (currentAppAttempt != null) { + currentAppAttempt.getAppSchedulingInfo().nodeRemoved(rmNode + .getNodeID()); + } + } + // Deactivate the node rmNode.context.getRMNodes().remove(rmNode.nodeId); LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index abd72bf..9410cf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -300,6 +300,10 @@ public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { return app == null ? null : app.getCurrentAppAttempt(); } + public List> getAllApplications() { + return new ArrayList<>(applications.values()); + } + @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId appAttemptId) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index c5f8cd1..32f6de9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -63,7 +64,6 @@ final String user; // TODO making containerIdCounter long private final AtomicLong containerIdCounter; - private final int EPOCH_BIT_SHIFT = 40; final Set priorities = new TreeSet( new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator()); @@ -82,6 +82,11 @@ boolean pending = true; // for app metrics private ResourceUsage appResourceUsage; + + // Used to notify the app that nodes have been added / removed + // Useful when Distributed Scheduling is enabled + private List newNodes = new ArrayList<>(); + private List removedNodes = new ArrayList<>(); public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user, Queue queue, ActiveUsersManager activeUsersManager, @@ -92,7 +97,8 @@ public AppSchedulingInfo(ApplicationAttemptId appAttemptId, this.queueName = queue.getQueueName(); this.user = user; this.activeUsersManager = activeUsersManager; - this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT); + this.containerIdCounter = + new AtomicLong(epoch << ResourceManager.EPOCH_BIT_SHIFT); this.appResourceUsage = appResourceUsage; } @@ -126,7 +132,9 @@ private synchronized void clearRequests() { } public long getNewContainerId() { - return this.containerIdCounter.incrementAndGet(); + // For Distributed Scheduling, to differentiate containerIds generated + // by the RM and NM. Those generated by the RM are always even numbers. + return this.containerIdCounter.addAndGet(2); } public boolean hasIncreaseRequest(NodeId nodeId) { @@ -816,4 +824,24 @@ public ResourceRequest cloneResourceRequest(ResourceRequest request) { request.getRelaxLocality(), request.getNodeLabelExpression()); return newRequest; } + + public synchronized void nodeAdded(NodeId node) { + newNodes.add(node); + } + + public synchronized void nodeRemoved(NodeId node) { + removedNodes.add(node); + } + + public synchronized List getAndClearAddedNodes() { + List retList = new ArrayList<>(newNodes); + newNodes.clear(); + return retList; + } + + public synchronized List getAndClearRemovedNodes() { + List retList = new ArrayList<>(removedNodes); + removedNodes.clear(); + return retList; + } }