diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 85e4181471b..4d0d0748da0 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -111,6 +111,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.client.api.TimelineV2Client; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -1769,8 +1770,8 @@ public MyFifoScheduler(RMContext rmContext) { @Override public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, - List release, List blacklistAdditions, - List blacklistRemovals, + List schedulingRequests, List release, + List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { List askCopy = new ArrayList(); for (ResourceRequest req : ask) { @@ -1785,7 +1786,7 @@ public synchronized Allocation allocate( lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; Allocation allocation = super.allocate( - applicationAttemptId, askCopy, release, blacklistAdditions, + applicationAttemptId, askCopy, schedulingRequests, release, blacklistAdditions, blacklistRemovals, updateRequests); if (forceResourceLimit != null) { // Test wants to force the non-default resource limit @@ -1815,8 +1816,8 @@ public ExcessReduceContainerAllocateScheduler(RMContext rmContext) { @Override public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, - List release, List blacklistAdditions, - List blacklistRemovals, + List schedulingRequests, List release, + List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { List askCopy = new ArrayList(); for (ResourceRequest req : ask) { @@ -1827,7 +1828,7 @@ public synchronized Allocation allocate( } SecurityUtil.setTokenServiceUseIp(false); Allocation normalAlloc = super.allocate( - applicationAttemptId, askCopy, release, + applicationAttemptId, askCopy, schedulingRequests, release, blacklistAdditions, blacklistRemovals, updateRequests); List containers = normalAlloc.getContainers(); if(containers.size() > 0) { diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java index 6848b22b53e..35f3ed166f5 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSCapacityScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -42,9 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -100,16 +99,17 @@ public void setConf(Configuration conf) { @Override public Allocation allocate(ApplicationAttemptId attemptId, - List resourceRequests, List containerIds, - List strings, List strings2, - ContainerUpdates updateRequests) { + List resourceRequests, + List schedulingRequests, List containerIds, + List strings, List strings2, ContainerUpdates updateRequests) { if (metricsON) { final Timer.Context context = schedulerMetrics.getSchedulerAllocateTimer() .time(); Allocation allocation = null; try { allocation = super - .allocate(attemptId, resourceRequests, containerIds, strings, + .allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, strings, strings2, updateRequests); return allocation; } finally { @@ -123,7 +123,8 @@ public Allocation allocate(ApplicationAttemptId attemptId, } } } else { - return super.allocate(attemptId, resourceRequests, containerIds, strings, + return super.allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, strings, strings2, updateRequests); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java index 8e49c517ec2..c27ab3eed41 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/SLSFairScheduler.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -39,8 +40,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.sls.SLSRunner; import org.apache.hadoop.yarn.sls.conf.SLSConfiguration; @@ -94,7 +93,8 @@ public void setConf(Configuration conf) { @Override public Allocation allocate(ApplicationAttemptId attemptId, - List resourceRequests, List containerIds, + List resourceRequests, + List schedulingRequests, List containerIds, List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { if (metricsON) { @@ -102,7 +102,8 @@ public Allocation allocate(ApplicationAttemptId attemptId, .time(); Allocation allocation = null; try { - allocation = super.allocate(attemptId, resourceRequests, containerIds, + allocation = super.allocate(attemptId, resourceRequests, + schedulingRequests, containerIds, blacklistAdditions, blacklistRemovals, updateRequests); return allocation; } finally { @@ -116,7 +117,8 @@ public Allocation allocate(ApplicationAttemptId attemptId, } } } else { - return super.allocate(attemptId, resourceRequests, containerIds, + return super.allocate(attemptId, resourceRequests, schedulingRequests, + containerIds, blacklistAdditions, blacklistRemovals, updateRequests); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java new file mode 100644 index 00000000000..2d2562450fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/SchedulerInvalidResoureRequestException.java @@ -0,0 +1,29 @@ +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This exception is thrown when any issue inside scheduler to handle a new or + * updated {@link org.apache.hadoop.yarn.api.records.SchedulingRequest}/ + * {@link org.apache.hadoop.yarn.api.records.ResourceRequest} add to the + * scheduler. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class SchedulerInvalidResoureRequestException extends YarnRuntimeException { + private static final long serialVersionUID = 10081123982L; + + public SchedulerInvalidResoureRequestException(String message) { + super(message); + } + + public SchedulerInvalidResoureRequestException(Throwable cause) { + super(cause); + } + + public SchedulerInvalidResoureRequestException(String message, + Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java index 337d7d4af70..7f5b59be9ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; @@ -563,8 +564,8 @@ public MyFifoScheduler(RMContext rmContext) { @Override public synchronized Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, - List release, List blacklistAdditions, - List blacklistRemovals, + List schedulingRequests, List release, + List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { List askCopy = new ArrayList(); for (ResourceRequest req : ask) { @@ -580,7 +581,8 @@ public synchronized Allocation allocate( lastDecrease = updateRequests.getDecreaseRequests(); lastBlacklistAdditions = blacklistAdditions; lastBlacklistRemovals = blacklistRemovals; - return super.allocate(applicationAttemptId, askCopy, release, + return super.allocate(applicationAttemptId, askCopy, schedulingRequests, + release, blacklistAdditions, blacklistRemovals, updateRequests); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java index c4f37f62649..0fce083b5ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/SchedulerRequestKey.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; /** @@ -45,6 +46,16 @@ public static SchedulerRequestKey create(ResourceRequest req) { req.getAllocationRequestId(), null); } + /** + * Factory method to generate a SchedulerRequestKey from a SchedulingRequest. + * @param req SchedulingRequest + * @return SchedulerRequestKey + */ + public static SchedulerRequestKey create(SchedulingRequest req) { + return new SchedulerRequestKey(req.getPriority(), + req.getAllocationRequestId(), null); + } + public static SchedulerRequestKey create(UpdateContainerRequest req, SchedulerRequestKey schedulerRequestKey) { return new SchedulerRequestKey(schedulerRequestKey.getPriority(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementTagsConstants.java similarity index 76% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementTagsConstants.java index 43fcfe5bb39..60296a9dae1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsNamespaces.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementTagsConstants.java @@ -18,7 +18,10 @@ * / */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; +package org.apache.hadoop.yarn.api.resource; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; /** * Predefined namespaces for tags @@ -26,6 +29,9 @@ * Same as namespace of resource types. Namespaces of placement tags are start * with alphabets and ended with "/" */ -public class AllocationTagsNamespaces { +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class PlacementTagsConstants { public static final String APP_ID = "yarn_app_id/"; + public static final String NODE_PARTITION_KEY = "yarn_node_partition"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 713947fe5b7..18ab473d3c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -273,10 +274,14 @@ public void allocate(ApplicationAttemptId appAttemptId, " state, ignore container allocate request."); allocation = EMPTY_ALLOCATION; } else { - allocation = - getScheduler().allocate(appAttemptId, ask, release, - blacklistAdditions, blacklistRemovals, - containerUpdateRequests); + try { + allocation = getScheduler().allocate(appAttemptId, ask, + request.getSchedulingRequests(), release, + blacklistAdditions, blacklistRemovals, containerUpdateRequests); + } catch (SchedulerInvalidResoureRequestException e) { + LOG.warn("Exceptions caught when scheduler handling requests"); + throw new YarnException(e); + } } if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 2d5c6a3ab54..f886143086c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1112,8 +1112,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, Allocation amContainerAllocation = appAttempt.scheduler.allocate( appAttempt.applicationAttemptId, - appAttempt.amReqs, - EMPTY_CONTAINER_RELEASE_LIST, + appAttempt.amReqs, null, EMPTY_CONTAINER_RELEASE_LIST, amBlacklist.getBlacklistAdditions(), amBlacklist.getBlacklistRemovals(), new ContainerUpdates()); @@ -1139,7 +1138,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, // Acquire the AM container from the scheduler. Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, - EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null, + EMPTY_CONTAINER_REQUEST_LIST, null, EMPTY_CONTAINER_RELEASE_LIST, null, null, new ContainerUpdates()); // There must be at least one container allocated, because a // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 1589d844975..9a3a8e461f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; @@ -1150,12 +1151,29 @@ public Resource getNormalizedResource(Resource requestedResource) { * * @param asks resource requests */ - protected void normalizeRequests(List asks) { + protected void normalizeResourceRequests(List asks) { for (ResourceRequest ask: asks) { ask.setCapability(getNormalizedResource(ask.getCapability())); } } + /** + * Normalize a list of SchedulingRequest + * + * @param asks scheduling request + */ + protected void normalizeSchedulingRequests(List asks) { + if (asks == null) { + return; + } + for (SchedulingRequest ask: asks) { + ResourceSizing sizing = ask.getResourceSizing(); + if (sizing != null && sizing.getResources() != null) { + sizing.setResources(getNormalizedResource(sizing.getResources())); + } + } + } + protected void handleContainerUpdates( SchedulerApplicationAttempt appAttempt, ContainerUpdates updates) { List promotionRequests = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index e47f0c1d614..7886a18e5c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; @@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SingleConstraintAppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -160,58 +162,139 @@ public ContainerUpdateContext getUpdateContext() { * application, by asking for more resources and releasing resources acquired * by the application. * - * @param requests - * resources to be acquired + * @param resourceRequests resource requests to be allocated * @param recoverPreemptedRequestForAContainer - * recover ResourceRequest on preemption + * recover ResourceRequest/SchedulingRequest on preemption * @return true if any resource was updated, false otherwise */ - public boolean updateResourceRequests(List requests, + public boolean updateResourceRequests(List resourceRequests, boolean recoverPreemptedRequestForAContainer) { - if (null == requests || requests.isEmpty()) { - return false; - } - // Flag to track if any incoming requests update "ANY" requests - boolean offswitchResourcesUpdated = false; + boolean offswitchResourcesUpdated; + writeLock.lock(); try { - this.writeLock.lock(); - - // A map to group resource requests and dedup - Map> dedupRequests = - new HashMap<>(); + // Update AppPlacementAllocator by requests + offswitchResourcesUpdated = addResourceRequests( + recoverPreemptedRequestForAContainer, resourceRequests); + } finally { + writeLock.unlock(); + } - // Group resource request by schedulerRequestKey and resourceName - for (ResourceRequest request : requests) { - SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); - if (!dedupRequests.containsKey(schedulerKey)) { - dedupRequests.put(schedulerKey, new HashMap<>()); - } - dedupRequests.get(schedulerKey).put(request.getResourceName(), request); - } + return offswitchResourcesUpdated; + } - // Update AppPlacementAllocator by dedup requests. - offswitchResourcesUpdated = - addRequestToAppPlacement( - recoverPreemptedRequestForAContainer, dedupRequests); + /** + * The ApplicationMaster is updating resource requirements for the + * application, by asking for more resources and releasing resources acquired + * by the application. + * + * @param schedulingRequests resource requests to be allocated + * @param recoverPreemptedRequestForAContainer + * recover ResourceRequest/SchedulingRequest on preemption + * @return true if any resource was updated, false otherwise + */ + public boolean updateSchedulingRequests( + List schedulingRequests, + boolean recoverPreemptedRequestForAContainer) { + // Flag to track if any incoming requests update "ANY" requests + boolean offswitchResourcesUpdated; - return offswitchResourcesUpdated; + writeLock.lock(); + try { + // Update AppPlacementAllocator by requests + offswitchResourcesUpdated = addSchedulingRequests( + recoverPreemptedRequestForAContainer, schedulingRequests); } finally { - this.writeLock.unlock(); + writeLock.unlock(); } + + return offswitchResourcesUpdated; } public void removeAppPlacement(SchedulerRequestKey schedulerRequestKey) { schedulerKeyToAppPlacementAllocator.remove(schedulerRequestKey); } - boolean addRequestToAppPlacement( + private boolean addSchedulingRequests( boolean recoverPreemptedRequestForAContainer, - Map> dedupRequests) { + List schedulingRequests) { + // Do we need to update pending resource for app/queue, etc.? + boolean requireUpdatePendingResource = false; + + Map schedulingRequestMap = + new HashMap<>(); + for (SchedulingRequest req : schedulingRequests) { + schedulingRequestMap.put(SchedulerRequestKey.create(req), req); + } + + for (Map.Entry entry : schedulingRequestMap + .entrySet()) { + SchedulerRequestKey schedulerRequestKey = entry.getKey(); + SchedulingRequest schedulingRequest = entry.getValue(); + + AppPlacementAllocator appPlacementAllocator = getAppPlacementAllocator( + schedulerRequestKey, schedulingRequest); + + // Update AppPlacementAllocator + PendingAskUpdateResult pendingAmountChanges = + appPlacementAllocator.updatePendingAsk(schedulerRequestKey, + schedulingRequest, recoverPreemptedRequestForAContainer); + + if (null != pendingAmountChanges) { + updatePendingResources(pendingAmountChanges, schedulerRequestKey, + queue.getMetrics()); + requireUpdatePendingResource = true; + } + } + + return requireUpdatePendingResource; + } + + /* + * Get app placement allocator according to scheduler request key and request, + * Throw exception if incompatible request specified. For example, if + * a SchedulingRequest made against a scheduler request key which associated + * with a ResourceRequest. + */ + private AppPlacementAllocator getAppPlacementAllocator( + SchedulerRequestKey schedulerRequestKey, Object request) { + AppPlacementAllocator allocator = schedulerKeyToAppPlacementAllocator.get( + schedulerRequestKey); + + if (allocator == null) { + schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey, + // FIXME, use factory to get the allocator. (once YARN-6599 merged) + request instanceof ResourceRequest ? + new LocalityAppPlacementAllocator<>(this) : + new SingleConstraintAppPlacementAllocator<>(schedulerRequestKey, + this)); + } + return allocator; + } + + private boolean addResourceRequests(boolean recoverPreemptedRequestForAContainer, + List resourceRequests) { + if (null == resourceRequests || resourceRequests.isEmpty()) { + return false; + } + + // A map to group resource requests and dedup + Map> dedupRequests = + new HashMap<>(); + + // Group resource request by schedulerRequestKey and resourceName + for (ResourceRequest request : resourceRequests) { + SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request); + if (!dedupRequests.containsKey(schedulerKey)) { + dedupRequests.put(schedulerKey, new HashMap<>()); + } + dedupRequests.get(schedulerKey).put(request.getResourceName(), request); + } + boolean offswitchResourcesUpdated = false; - for (Map.Entry> entry : - dedupRequests.entrySet()) { + for (Map.Entry> entry : dedupRequests + .entrySet()) { SchedulerRequestKey schedulerRequestKey = entry.getKey(); if (!schedulerKeyToAppPlacementAllocator.containsKey( @@ -220,15 +303,16 @@ boolean addRequestToAppPlacement( new LocalityAppPlacementAllocator<>(this)); } + AppPlacementAllocator appPlacementAllocator = getAppPlacementAllocator( + schedulerRequestKey, resourceRequests.get(0)); + // Update AppPlacementAllocator PendingAskUpdateResult pendingAmountChanges = - schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey) - .updatePendingAsk(entry.getValue().values(), - recoverPreemptedRequestForAContainer); + appPlacementAllocator.updatePendingAsk(entry.getValue().values(), + recoverPreemptedRequestForAContainer); if (null != pendingAmountChanges) { - updatePendingResources( - pendingAmountChanges, schedulerRequestKey, + updatePendingResources(pendingAmountChanges, schedulerRequestKey, queue.getMetrics()); offswitchResourcesUpdated = true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index f410db12f32..3b5a1bac6a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -32,15 +32,16 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer - .RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -140,13 +141,12 @@ public synchronized boolean checkAndAddToOutstandingIncreases( containerIds.add(container.getId()); if (!Resources.isNone(resToIncrease)) { - Map> updateResReqs = - new HashMap<>(); - Map resMap = - createResourceRequests(rmContainer, schedulerNode, - schedulerKey, resToIncrease); - updateResReqs.put(schedulerKey, resMap); - appSchedulingInfo.addRequestToAppPlacement(false, updateResReqs); + List resRequestsForContainer = createResourceRequests( + rmContainer, schedulerNode, schedulerKey, resToIncrease); + if (resRequestsForContainer != null) { + appSchedulingInfo.updateResourceRequests(resRequestsForContainer, + false); + } } return true; } @@ -170,23 +170,23 @@ private void cancelPreviousRequest(SchedulerNode schedulerNode, } } - private Map createResourceRequests( - RMContainer rmContainer, SchedulerNode schedulerNode, - SchedulerRequestKey schedulerKey, Resource resToIncrease) { - Map resMap = new HashMap<>(); - resMap.put(rmContainer.getContainer().getNodeId().getHost(), + private List createResourceRequests(RMContainer rmContainer, + SchedulerNode schedulerNode, SchedulerRequestKey schedulerKey, + Resource resToIncrease) { + List resourceRequestList = new ArrayList<>(); + resourceRequestList.add( createResourceReqForIncrease(schedulerKey, resToIncrease, RECORD_FACTORY.newRecordInstance(ResourceRequest.class), rmContainer, rmContainer.getContainer().getNodeId().getHost())); - resMap.put(schedulerNode.getRackName(), + resourceRequestList.add( createResourceReqForIncrease(schedulerKey, resToIncrease, RECORD_FACTORY.newRecordInstance(ResourceRequest.class), rmContainer, schedulerNode.getRackName())); - resMap.put(ResourceRequest.ANY, + resourceRequestList.add( createResourceReqForIncrease(schedulerKey, resToIncrease, RECORD_FACTORY.newRecordInstance(ResourceRequest.class), rmContainer, ResourceRequest.ANY)); - return resMap; + return resourceRequestList; } private Resource getResourceToIncrease(UpdateContainerRequest updateReq, @@ -284,13 +284,11 @@ public ContainerId matchContainerToOutstandingIncreaseReq( // we need.. We need to signal that this container has to be released. // We also need to add these requests back.. to be reallocated. if (resourceMap != null && retVal == null) { - Map> reqsToUpdate = - new HashMap<>(); - Map resMap = createResourceRequests - (rmContainer, node, schedulerKey, - rmContainer.getContainer().getResource()); - reqsToUpdate.put(schedulerKey, resMap); - appSchedulingInfo.addRequestToAppPlacement(true, reqsToUpdate); + List resRequests = createResourceRequests(rmContainer, + node, schedulerKey, rmContainer.getContainer().getResource()); + if (resRequests != null) { + appSchedulingInfo.updateResourceRequests(resRequests, true); + } return UNDEFINED; } return retVal; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index dfb0e67fcd9..f628d08036b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -448,6 +449,23 @@ public boolean updateResourceRequests( writeLock.unlock(); } } + + public boolean updateSchedulingRequests( + List requests) { + if (requests == null) { + return false; + } + + try { + writeLock.lock(); + if (!isStopped) { + return appSchedulingInfo.updateSchedulingRequests(requests, false); + } + return false; + } finally { + writeLock.unlock(); + } + } public void recoverResourceRequestsForContainer( ContainerRequest containerRequest) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 93ca7c2ea18..43d55c4f1dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -132,18 +133,18 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, * * @param appAttemptId * @param ask + * @param schedulingRequests * @param release - * @param blacklistAdditions - * @param blacklistRemovals - * @param updateRequests - * @return the {@link Allocation} for the application + * @param blacklistAdditions + * @param blacklistRemovals + * @param updateRequests @return the {@link Allocation} for the application */ @Public @Stable Allocation allocate(ApplicationAttemptId appAttemptId, - List ask, List release, - List blacklistAdditions, List blacklistRemovals, - ContainerUpdates updateRequests); + List ask, List schedulingRequests, + List release, List blacklistAdditions, + List blacklistRemovals, ContainerUpdates updateRequests); /** * Get node resource usage report. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index f03d7d1f2e8..dcfd58b4d22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -1014,8 +1014,8 @@ private void doneApplicationAttempt( @Override @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, - List ask, List release, - List blacklistAdditions, List blacklistRemovals, + List ask, List schedulingRequests, + List release, List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId); if (application == null) { @@ -1044,7 +1044,10 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, LeafQueue updateDemandForQueue = null; // Sanity check for new allocation requests - normalizeRequests(ask); + normalizeResourceRequests(ask); + + // Normalize scheduling requests + normalizeSchedulingRequests(schedulingRequests); Allocation allocation; @@ -1066,7 +1069,8 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } // Update application requests - if (application.updateResourceRequests(ask)) { + if (application.updateResourceRequests(ask) || application + .updateSchedulingRequests(schedulingRequests)) { updateDemandForQueue = (LeafQueue) application.getQueue(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java index 85d8715bb2a..2ed3e837188 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/PendingAsk.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -31,6 +32,11 @@ private final int count; public final static PendingAsk ZERO = new PendingAsk(Resources.none(), 0); + public PendingAsk(ResourceSizing sizing) { + this.perAllocationResource = sizing.getResources(); + this.count = sizing.getNumAllocations(); + } + public PendingAsk(Resource res, int num) { this.perAllocationResource = res; this.count = num; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index 4bb3e795f33..14a56a628cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementTagsConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.log4j.Logger; @@ -292,7 +293,7 @@ public void addContainer(NodeId nodeId, ContainerId containerId, ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); String applicationIdTag = - AllocationTagsNamespaces.APP_ID + applicationId.toString(); + PlacementTagsConstants.APP_ID + applicationId.toString(); boolean useSet = false; if (allocationTags != null && !allocationTags.isEmpty()) { @@ -346,7 +347,7 @@ public void removeContainer(NodeId nodeId, ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); String applicationIdTag = - AllocationTagsNamespaces.APP_ID + applicationId.toString(); + PlacementTagsConstants.APP_ID + applicationId.toString(); boolean useSet = false; if (allocationTags != null && !allocationTags.isEmpty()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index ebc7222ccf3..621c142915b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -781,9 +782,9 @@ public void killContainer(RMContainer container) { @Override public Allocation allocate(ApplicationAttemptId appAttemptId, - List ask, List release, - List blacklistAdditions, List blacklistRemovals, - ContainerUpdates updateRequests) { + List ask, List schedulingRequests, + List release, List blacklistAdditions, + List blacklistRemovals, ContainerUpdates updateRequests) { // Make sure this application exists FSAppAttempt application = getSchedulerApp(appAttemptId); @@ -808,7 +809,9 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, handleContainerUpdates(application, updateRequests); // Sanity check - normalizeRequests(ask); + normalizeResourceRequests(ask); + + // TODO, normalize SchedulingRequest // Record container allocation start time application.recordContainerRequestTime(getClock().getTime()); @@ -830,6 +833,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, // Update application requests application.updateResourceRequests(ask); + // TODO, handle SchedulingRequest application.showRequests(); } } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 826575d31d3..7c46f5715ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -322,8 +323,8 @@ public synchronized void setRMContext(RMContext rmContext) { @Override public Allocation allocate(ApplicationAttemptId applicationAttemptId, - List ask, List release, - List blacklistAdditions, List blacklistRemovals, + List ask, List schedulingRequests, + List release, List blacklistAdditions, List blacklistRemovals, ContainerUpdates updateRequests) { FifoAppAttempt application = getApplicationAttempt(applicationAttemptId); if (application == null) { @@ -344,7 +345,7 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, } // Sanity check - normalizeRequests(ask); + normalizeResourceRequests(ask); // Release containers releaseContainers(release, application); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java index dcb38aa0054..af138202aaf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; @@ -70,6 +71,20 @@ PendingAskUpdateResult updatePendingAsk( boolean recoverPreemptedRequestForAContainer); /** + * Replace existing pending asks by the new SchedulingRequest + * + * @param schedulerRequestKey scheduler request key + * @param schedulingRequest new asks + * @param recoverPreemptedRequestForAContainer if we're recovering resource + * requests for preempted container + * @return true if total pending resource changed + */ + PendingAskUpdateResult updatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest schedulingRequest, + boolean recoverPreemptedRequestForAContainer); + + /** * Get pending ResourceRequests by given schedulerRequestKey * @return Map of resourceName to ResourceRequest */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index 766827ca300..f4f62df0a33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -22,6 +22,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -176,6 +178,19 @@ public PendingAskUpdateResult updatePendingAsk( } @Override + public PendingAskUpdateResult updatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest schedulingRequest, + boolean recoverPreemptedRequestForAContainer) + throws SchedulerInvalidResoureRequestException { + throw new SchedulerInvalidResoureRequestException(this.getClass().getName() + + " not be able to handle SchedulingRequest, there exists a " + + "ResourceRequest with the same scheduler key=" + schedulerRequestKey + + ", please send SchedulingRequest with a different allocationId and " + + "priority"); + } + + @Override public Map getResourceRequests() { return resourceRequestMap; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java new file mode 100644 index 00000000000..ba3a1e53bf5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.hadoop.yarn.api.resource.PlacementTagsConstants.APP_ID; +import static org.apache.hadoop.yarn.api.resource.PlacementTagsConstants.NODE_PARTITION_KEY; + +/** + * This is a simple implementation to do affinity or anti-affinity for + * inter/intra apps. + * + * For now we limit it to handle node scope anti-affinity for intra-app only + * because of test bandwidth limitation. See validateAndSetSchedulingRequest. + */ +public class SingleConstraintAppPlacementAllocator + implements AppPlacementAllocator { + private static final Log LOG = + LogFactory.getLog(SingleConstraintAppPlacementAllocator.class); + + private ReentrantReadWriteLock.ReadLock readLock; + private ReentrantReadWriteLock.WriteLock writeLock; + private SchedulingRequest schedulingRequest = null; + private AppSchedulingInfo appSchedulingInfo; + private String targetNodePartition; + private Set targetAllocationTags; + private SchedulerRequestKey schedulerRequestKey; + + public SingleConstraintAppPlacementAllocator( + SchedulerRequestKey schedulerRequestKey, AppSchedulingInfo info) { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + this.appSchedulingInfo = info; + this.schedulerRequestKey = schedulerRequestKey; + } + + @Override + @SuppressWarnings("unchecked") + public Iterator getPreferredNodeIterator( + CandidateNodeSet candidateNodeSet) { + // Now only handle the case that single node in the candidateNodeSet + // TODO, Add support to multi-hosts inside candidateNodeSet which is passed + // in. + + N singleNode = CandidateNodeSetUtils.getSingleNode(candidateNodeSet); + if (null != singleNode) { + return IteratorUtils.singletonIterator(singleNode); + } + + return IteratorUtils.emptyIterator(); } + + @Override + public PendingAskUpdateResult updatePendingAsk( + Collection requests, + boolean recoverPreemptedRequestForAContainer) { + if (requests != null && !requests.isEmpty()) { + throw new SchedulerInvalidResoureRequestException( + this.getClass().getName() + + " not be able to handle ResourceRequest, there exists a " + + "SchedulingRequest with the same scheduler key=" + + SchedulerRequestKey.create(requests.iterator().next()) + + ", please send ResourceRequest with a different allocationId and " + + "priority"); + } + + // Do nothing + return null; + } + + private PendingAskUpdateResult internalUpdatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest newSchedulingRequest) { + if (schedulingRequest != null) { + // If we have an old scheduling request, we will make sure that no changes + // made except #containers. + // We do this by replacing numAllocations with old numAllocations in the + // newSchedulingRequest#getResourceSizing, and compare the two objects. + ResourceSizing sizing = newSchedulingRequest.getResourceSizing(); + int newNumAllocations = sizing.getNumAllocations(); + int existingNumAllocations = + schedulingRequest.getResourceSizing().getNumAllocations(); + sizing.setNumAllocations(existingNumAllocations); + + // Compare two objects + if (!schedulingRequest.equals(newSchedulingRequest)) { + // Rollback #numAllocations + sizing.setNumAllocations(newNumAllocations); + throw new SchedulerInvalidResoureRequestException( + "Invalid updated SchedulingRequest added to scheduler, " + + " we only allows changing numAllocations for the updated " + + "SchedulingRequest. Old=" + schedulingRequest.toString() + + " new=" + newSchedulingRequest.toString() + + ", if any fields need to be updated, please cancel the " + + "old request (by setting numAllocations to 0) and send a " + + "SchedulingRequest with different combination of " + + "priority/allocationId"); + } + + // Rollback #numAllocations + sizing.setNumAllocations(newNumAllocations); + + // Basic sanity check + if (newNumAllocations < 0) { + throw new SchedulerInvalidResoureRequestException( + "numAllocation in ResourceSizing field must be >= 0, " + + "updating schedulingRequest failed."); + } + + // Ok, now everything is same except numAllocation, update numAllocation. + LOG.info( + "Update numAllocation from old=" + existingNumAllocations + " to new=" + + newNumAllocations); + return new PendingAskUpdateResult( + new PendingAsk(schedulingRequest.getResourceSizing()), + new PendingAsk(newSchedulingRequest.getResourceSizing()), + targetNodePartition, targetNodePartition); + } + + // For a new schedulingRequest, we need to validate if we support its asks. + // This will update internal partitions, etc. after the SchedulingRequest is + // valid. + validateAndSetSchedulingRequest(newSchedulingRequest); + + return new PendingAskUpdateResult(null, + new PendingAsk(newSchedulingRequest.getResourceSizing()), null, + targetNodePartition); + } + + @Override + public PendingAskUpdateResult updatePendingAsk( + SchedulerRequestKey schedulerRequestKey, + SchedulingRequest newSchedulingRequest, + boolean recoverPreemptedRequestForAContainer) { + writeLock.lock(); + try { + return internalUpdatePendingAsk(schedulerRequestKey, + newSchedulingRequest); + } finally { + writeLock.unlock(); + } + } + + private String throwExceptionWithMetaInfo(String message) { + StringBuilder sb = new StringBuilder(); + sb.append("AppId=").append(appSchedulingInfo.getApplicationId()).append( + " Key=").append(this.schedulerRequestKey).append(". Exception message:") + .append(message); + throw new SchedulerInvalidResoureRequestException(sb.toString()); + } + + private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequest) + throws SchedulerInvalidResoureRequestException { + // Assume sizing is already checked and normalized by Scheduler. + // ... + + PlacementConstraint constraint = + newSchedulingRequest.getPlacementConstraint(); + + // We only accept SingleConstraint + PlacementConstraint.AbstractConstraint ac = constraint.getConstraintExpr(); + if (!(ac instanceof PlacementConstraint.SingleConstraint)) { + throwExceptionWithMetaInfo( + "Only accepts " + PlacementConstraint.SingleConstraint.class.getName() + + " as constraint-expression. Rejecting the new added " + + "constraint-expression.class=" + ac.getClass().getName()); + } + + PlacementConstraint.SingleConstraint singleConstraint = + (PlacementConstraint.SingleConstraint) ac; + + // Make sure it is an anti-affinity request (actually this implementation + // should be able to support both affinity / anti-affinity without much + // effort. Considering potential test effort required. Limit to + // anti-affinity to intra-app and scope is node. + if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) { + throwExceptionWithMetaInfo( + "Only support scope=" + PlacementConstraints.NODE + + "now. PlacementConstraint=" + singleConstraint); + } + + if (singleConstraint.getMinCardinality() != 0 + || singleConstraint.getMaxCardinality() != 1) { + throwExceptionWithMetaInfo( + "Only support anti-affinity, which is: minCardinality=0, " + + "maxCardinality=1"); + } + + Set targetExpressionSet = + singleConstraint.getTargetExpressions(); + if (targetExpressionSet == null || targetExpressionSet.isEmpty()) { + throwExceptionWithMetaInfo( + "TargetExpression should not be null or empty"); + } + + // Set node partition + String nodePartition = null; + + // Target allocation tags + Set targetAllocationTags = null; + + // Target App Id + boolean targetAppIdChecked = false; + + for (PlacementConstraint.TargetExpression targetExpression : targetExpressionSet) { + if (targetExpression.getTargetType().equals( + PlacementConstraint.TargetExpression.TargetType.SELF)) { + throwExceptionWithMetaInfo("SELF target type is not supported"); + } + + // Handle node partition + if (targetExpression.getTargetType().equals( + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) { + // For node attribute target, we only support Partition now. And once + // YARN-3409 is merged, we will support node attribute. + if (!targetExpression.getTargetKey().equals(NODE_PARTITION_KEY)) { + throwExceptionWithMetaInfo("When TargetType=" + + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE + + " only " + NODE_PARTITION_KEY + " is accepted as TargetKey."); + } + + if (nodePartition != null) { + // This means we have duplicated node partition entry inside placement + // constraint, which might be set by mistake. + throwExceptionWithMetaInfo( + "Only one node partition targetExpression is allowed"); + } + + Set values = targetExpression.getTargetValues(); + if (values == null || values.isEmpty()) { + nodePartition = RMNodeLabelsManager.NO_LABEL; + continue; + } + + if (values.size() > 1) { + throwExceptionWithMetaInfo("Inside one targetExpression, we only " + + "support affinity to at most one node partition now"); + } + + nodePartition = values.iterator().next(); + continue; + } + + // Handle allocation tags + if (targetExpression.getTargetType().equals( + PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) { + if (targetAllocationTags != null) { + // This means we have duplicated AllocationTag expressions entries + // inside placement constraint, which might be set by mistake. + throwExceptionWithMetaInfo( + "Only one AllocationTag targetExpression is allowed"); + } + + targetAllocationTags = new HashSet<>( + targetExpression.getTargetValues()); + + // Make sure we have one and only one target is set to our own app id + for (String tag : targetAllocationTags) { + if (tag.startsWith(APP_ID)) { + String appId = tag.substring(APP_ID.length()); + String ownAppIdStr = appSchedulingInfo.getApplicationId().toString(); + if (appId.equals(ownAppIdStr)) { + targetAppIdChecked = true; + } else{ + throwExceptionWithMetaInfo( + "Only allow intra-app anti-affinity now, this target " + + "expression includes a target allocation_tag=" + tag + + ", we only expect has a tag: [" + APP_ID + ownAppIdStr + + "]"); + } + } + } + } + } + + if (targetAllocationTags == null || !targetAppIdChecked) { + throwExceptionWithMetaInfo("Failed to find allocation tags from " + + "TargetExpressions or couldn't find self-app target."); + } + + if (nodePartition == null) { + nodePartition = RMNodeLabelsManager.NO_LABEL; + } + + // Validation is done. set local results: + this.targetNodePartition = nodePartition; + this.targetAllocationTags = targetAllocationTags; + this.schedulingRequest = newSchedulingRequest; + + LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo + .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils + .join(",", targetAllocationTags) + "]. nodePartition=" + + targetNodePartition); + } + + @Override + @SuppressWarnings("unchecked") + public Map getResourceRequests() { + return Collections.EMPTY_MAP; + } + + @Override + public PendingAsk getPendingAsk(String resourceName) { + if (resourceName.equals("*") && schedulingRequest != null) { + return new PendingAsk(schedulingRequest.getResourceSizing()); + } + return PendingAsk.ZERO; + } + + @Override + public int getOutstandingAsksCount(String resourceName) { + if (resourceName.equals("*") && schedulingRequest != null) { + return schedulingRequest.getResourceSizing().getNumAllocations(); + } + return 0; + } + + @Override + public ContainerRequest allocate(SchedulerRequestKey schedulerKey, + NodeType type, SchedulerNode node) { + // TODO + return null; + } + + @Override + public boolean canAllocate(NodeType type, SchedulerNode node) { + return true; + } + + @Override + public boolean canDelayTo(String resourceName) { + return true; + } + + @Override + public boolean acceptNodePartition(String nodePartition, + SchedulingMode schedulingMode) { + // We will only look at node label = nodeLabelToLookAt according to + // schedulingMode and partition of node. + String nodePartitionToLookAt; + if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { + nodePartitionToLookAt = nodePartition; + } else{ + nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL; + } + + return this.targetNodePartition.equals(nodePartitionToLookAt); + } + + @Override + public String getPrimaryRequestedNodePartition() { + return targetNodePartition; + } + + @Override + public int getUniqueLocationAsks() { + return 1; + } + + @Override + public void showRequests() { + if (schedulingRequest != null) { + LOG.info(schedulingRequest.toString()); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java index fbde6813d5c..7d1140d1d70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java @@ -331,8 +331,7 @@ private synchronized void addResourceRequest( // Get resources from the ResourceManager Allocation allocation = resourceManager.getResourceScheduler().allocate( - applicationAttemptId, new ArrayList(ask), - new ArrayList(), null, null, + applicationAttemptId, new ArrayList(ask), null, new ArrayList(), null, null, new ContainerUpdates()); if (LOG.isInfoEnabled()) { @@ -431,7 +430,7 @@ private void updateResourceRequests(Map requests, if (type == NodeType.NODE_LOCAL) { for (String host : task.getHosts()) { if(LOG.isDebugEnabled()) { - LOG.debug("updatePendingAsk:" + " application=" + applicationId + LOG.debug("updateResourceDemands:" + " application=" + applicationId + " type=" + type + " host=" + host + " request=" + ((requests == null) ? "null" : requests.get(host))); } @@ -442,7 +441,7 @@ private void updateResourceRequests(Map requests, if (type == NodeType.NODE_LOCAL || type == NodeType.RACK_LOCAL) { for (String rack : task.getRacks()) { if(LOG.isDebugEnabled()) { - LOG.debug("updatePendingAsk:" + " application=" + applicationId + LOG.debug("updateResourceDemands:" + " application=" + applicationId + " type=" + type + " rack=" + rack + " request=" + ((requests == null) ? "null" : requests.get(rack))); } @@ -453,7 +452,7 @@ private void updateResourceRequests(Map requests, updateResourceRequest(requests.get(ResourceRequest.ANY)); if(LOG.isDebugEnabled()) { - LOG.debug("updatePendingAsk:" + " application=" + applicationId + LOG.debug("updateResourceDemands:" + " application=" + applicationId + " #asks=" + ask.size()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 9dd57034438..9eb63261201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -466,7 +466,7 @@ private void testAppAttemptScheduledState() { assertEquals(expectedState, applicationAttempt.getAppAttemptState()); verify(scheduler, times(expectedAllocateCount)).allocate( - any(ApplicationAttemptId.class), any(List.class), any(List.class), + any(ApplicationAttemptId.class), any(List.class), null, any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class)); assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); @@ -487,7 +487,7 @@ private void testAppAttemptAllocatedState(Container amContainer) { // Check events verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class)); verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class), - any(List.class), any(List.class), any(List.class), any(List.class), + any(List.class), null, any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class)); verify(nmTokenManager).clearNodeSetForAttempt( applicationAttempt.getAppAttemptId()); @@ -635,7 +635,7 @@ private Container allocateApplicationAttempt() { when(allocation.getContainers()). thenReturn(Collections.singletonList(container)); when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), - any(List.class), any(List.class), any(List.class), + null, any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class))). thenReturn(allocation); RMContainer rmContainer = mock(RMContainerImpl.class); @@ -1136,7 +1136,7 @@ public void testLaunchedFailWhileAHSEnabled() { when(allocation.getContainers()). thenReturn(Collections.singletonList(amContainer)); when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class), - any(List.class), any(List.class), any(List.class), + null, any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class))) .thenReturn(allocation); RMContainer rmContainer = mock(RMContainerImpl.class); @@ -1611,7 +1611,7 @@ public void testContainersCleanupForLastAttempt() { public void testScheduleTransitionReplaceAMContainerRequestWithDefaults() { YarnScheduler mockScheduler = mock(YarnScheduler.class); when(mockScheduler.allocate(any(ApplicationAttemptId.class), - any(List.class), any(List.class), any(List.class), any(List.class), + any(List.class), null, any(List.class), any(List.class), any(List.class), any(ContainerUpdates.class))) .thenAnswer(new Answer() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e91f7341a0a..3dd29bff8c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -103,7 +103,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -1115,12 +1114,12 @@ public void testBlackListNodes() throws Exception { cs.handle(addAttemptEvent); // Verify the blacklist can be updated independent of requesting containers - cs.allocate(appAttemptId, Collections.emptyList(), + cs.allocate(appAttemptId, Collections.emptyList(), null, Collections.emptyList(), Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); Assert.assertTrue(cs.getApplicationAttempt(appAttemptId) .isPlaceBlacklisted(host)); - cs.allocate(appAttemptId, Collections.emptyList(), + cs.allocate(appAttemptId, Collections.emptyList(), null, Collections.emptyList(), null, Collections.singletonList(host), NULL_UPDATE_REQUESTS); Assert.assertFalse(cs.getApplicationAttempt(appAttemptId) @@ -1216,8 +1215,7 @@ public void testAllocateReorder() throws Exception { //This will allocate for app1 cs.allocate(appAttemptId1, - Collections.singletonList(r1), - Collections.emptyList(), + Collections.singletonList(r1), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); //And this will result in container assignment for app1 @@ -1233,8 +1231,7 @@ public void testAllocateReorder() throws Exception { //Now, allocate for app2 (this would be the first/AM allocation) ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId2, - Collections.singletonList(r2), - Collections.emptyList(), + Collections.singletonList(r2), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); //In this case we do not perform container assignment because we want to @@ -3706,7 +3703,7 @@ public void testApplicationHeadRoom() throws Exception { Allocation allocate = cs.allocate(appAttemptId, Collections. emptyList(), - Collections. emptyList(), null, null, + null, Collections. emptyList(), null, null, NULL_UPDATE_REQUESTS); Assert.assertNotNull(attempt); @@ -3723,7 +3720,7 @@ public void testApplicationHeadRoom() throws Exception { allocate = cs.allocate(appAttemptId, Collections. emptyList(), - Collections. emptyList(), null, null, + null, Collections. emptyList(), null, null, NULL_UPDATE_REQUESTS); // All resources should be sent as headroom @@ -4249,8 +4246,7 @@ public void testCSReservationWithRootUnblocked() throws Exception { y1Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId3, - Collections.singletonList(y1Req), - Collections.emptyList(), + Collections.singletonList(y1Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } @@ -4263,8 +4259,7 @@ public void testCSReservationWithRootUnblocked() throws Exception { x1Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId1, - Collections.singletonList(x1Req), - Collections.emptyList(), + Collections.singletonList(x1Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } @@ -4276,8 +4271,7 @@ public void testCSReservationWithRootUnblocked() throws Exception { x2Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId2, - Collections.singletonList(x2Req), - Collections.emptyList(), + Collections.singletonList(x2Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); assertEquals("X2 Used Resource should be 0", 0, @@ -4288,8 +4282,7 @@ public void testCSReservationWithRootUnblocked() throws Exception { x1Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId1, - Collections.singletonList(x1Req), - Collections.emptyList(), + Collections.singletonList(x1Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); assertEquals("X1 Used Resource should be 7 GB", 7 * GB, @@ -4302,8 +4295,7 @@ public void testCSReservationWithRootUnblocked() throws Exception { y1Req = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId3, - Collections.singletonList(y1Req), - Collections.emptyList(), + Collections.singletonList(y1Req), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } @@ -4362,7 +4354,7 @@ public void testCSQueueBlocked() throws Exception { ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); //This will allocate for app1 cs.allocate(appAttemptId1, Collections.singletonList(r1), - Collections.emptyList(), + null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS).getContainers().size(); CapacityScheduler.schedule(cs); ResourceRequest r2 = null; @@ -4370,8 +4362,7 @@ public void testCSQueueBlocked() throws Exception { r2 = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId2, - Collections.singletonList(r2), - Collections.emptyList(), + Collections.singletonList(r2), null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); } @@ -4384,12 +4375,12 @@ public void testCSQueueBlocked() throws Exception { r2 = TestUtils.createResourceRequest( ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId1, Collections.singletonList(r1), - Collections.emptyList(), + null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS).getContainers().size(); CapacityScheduler.schedule(cs); cs.allocate(appAttemptId2, Collections.singletonList(r2), - Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); + null, Collections.emptyList(), null, null, NULL_UPDATE_REQUESTS); CapacityScheduler.schedule(cs); //Check blocked Resource assertEquals("A Used Resource should be 2 GB", 2 * GB, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index a3b88c03e21..01d5e6c38f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -170,7 +170,7 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { 1 * GB, 1, true, priority, recordFactory); cs.allocate(appAttemptId, Collections.singletonList(r1), - Collections.emptyList(), Collections.singletonList(host), + null, Collections.emptyList(), Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); //And this will result in container assignment for app1 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java index f1d5663523d..90da009ceef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsManager.java @@ -20,10 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; -import java.util.List; - +import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.resource.PlacementTagsConstants; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -33,7 +33,7 @@ import org.junit.Before; import org.junit.Test; -import com.google.common.collect.ImmutableSet; +import java.util.List; /** * Test functionality of AllocationTagsManager. @@ -145,13 +145,20 @@ public void testAllocationTagsManagerSimpleCases() .assertEquals(2, atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), TestUtils.getMockApplicationId(1), - ImmutableSet.of(AllocationTagsNamespaces.APP_ID + ImmutableSet.of(PlacementTagsConstants.APP_ID + TestUtils.getMockApplicationId(1).toString()), Long::max)); // Get Node Cardinality of app1 on node2, with empty tag set, op=max Assert.assertEquals(2, - atm.getNodeCardinalityByOp(NodeId.fromString("host2:123"), + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), + TestUtils.getMockApplicationId(1), ImmutableSet + .of(PlacementTagsConstants.APP_ID + TestUtils + .getMockApplicationId(1).toString()), Long::max)); + + // Get Cardinality of app1 on node2, with empty tag set, op=max + Assert.assertEquals(2, + atm.getNodeCardinalityByOp(NodeId.fromString("node2:1234"), TestUtils.getMockApplicationId(1), ImmutableSet.of(), Long::max)); // Get Node Cardinality of all apps on node2, with empty tag set, op=sum diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index af4e1dd32a0..3b9b310fcf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -188,7 +188,7 @@ protected ApplicationAttemptId createSchedulingRequest( resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, new ArrayList(), + scheduler.allocate(id, ask, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); scheduler.update(); return id; @@ -216,7 +216,7 @@ protected ApplicationAttemptId createSchedulingRequest(String queueId, resourceManager.getRMContext().getRMApps() .put(id.getApplicationId(), rmApp); - scheduler.allocate(id, ask, new ArrayList(), + scheduler.allocate(id, ask, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); return id; } @@ -239,7 +239,7 @@ protected void createSchedulingRequestExistingApplication( ResourceRequest request, ApplicationAttemptId attId) { List ask = new ArrayList(); ask.add(request); - scheduler.allocate(attId, ask, new ArrayList(), + scheduler.allocate(attId, ask, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); scheduler.update(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index 854a65c2831..46ebb3655ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -122,7 +122,7 @@ public void testBasic() throws InterruptedException { List ask = new ArrayList<>(); ask.add(createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true)); scheduler.allocate( - appAttemptId, ask, new ArrayList(), + appAttemptId, ask, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); @@ -160,8 +160,7 @@ public void testSortedNodes() throws Exception { ResourceRequest request = createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); ask.add(request); - scheduler.allocate(appAttemptId, ask, - new ArrayList(), null, null, NULL_UPDATE_REQUESTS); + scheduler.allocate(appAttemptId, ask, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); triggerSchedulingAttempt(); FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); @@ -172,8 +171,7 @@ public void testSortedNodes() throws Exception { createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true); ask.clear(); ask.add(request); - scheduler.allocate(appAttemptId, ask, - new ArrayList(), null, null, NULL_UPDATE_REQUESTS); + scheduler.allocate(appAttemptId, ask, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); triggerSchedulingAttempt(); checkAppConsumption(app, Resources.createResource(2048,2)); @@ -366,7 +364,7 @@ public void testFairSchedulerContinuousSchedulingInitTime() throws Exception { true); ask1.add(request1); ask1.add(request2); - scheduler.allocate(id11, ask1, new ArrayList(), null, null, + scheduler.allocate(id11, ask1, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index cd0570a5701..bc658020bd8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1278,7 +1278,7 @@ public void testRackLocalAppReservationThreshold() throws Exception { List asks = new ArrayList(); asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false)); - scheduler.allocate(attemptId, asks, new ArrayList(), null, + scheduler.allocate(attemptId, asks, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1); @@ -2123,7 +2123,7 @@ public void testQueueDemandCalculation() throws Exception { ResourceRequest request1 = createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1, true); ask1.add(request1); - scheduler.allocate(id11, ask1, new ArrayList(), + scheduler.allocate(id11, ask1, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); // Second ask, queue2 requests 1 large. @@ -2139,7 +2139,7 @@ public void testQueueDemandCalculation() throws Exception { ResourceRequest.ANY, 1, 1, false); ask2.add(request2); ask2.add(request3); - scheduler.allocate(id21, ask2, new ArrayList(), + scheduler.allocate(id21, ask2, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); // Third ask, queue2 requests 2 small (minReqSize). @@ -2155,7 +2155,7 @@ public void testQueueDemandCalculation() throws Exception { ResourceRequest.ANY, 2, 2, true); ask3.add(request4); ask3.add(request5); - scheduler.allocate(id22, ask3, new ArrayList(), + scheduler.allocate(id22, ask3, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); scheduler.update(); @@ -2681,7 +2681,7 @@ public void testReservationWithMultiplePriorities() throws IOException { // Complete the first container so we can trigger allocation for app2 ContainerId containerId = app1.getLiveContainers().iterator().next().getContainerId(); - scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(), + scheduler.allocate(app1.getApplicationAttemptId(), new ArrayList<>(), null, Arrays.asList(containerId), null, null, NULL_UPDATE_REQUESTS); // Trigger allocation for app2 @@ -2767,7 +2767,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception { asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1, true)); asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2, true)); - scheduler.allocate(attemptId, asks, new ArrayList(), null, + scheduler.allocate(attemptId, asks, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); // node 1 checks in @@ -3214,7 +3214,7 @@ public void testCancelStrictLocality() throws IOException { createResourceRequest(1024, node1.getHostName(), 1, 0, true), createResourceRequest(1024, "rack1", 1, 0, true), createResourceRequest(1024, ResourceRequest.ANY, 1, 1, true)); - scheduler.allocate(attId1, update, new ArrayList(), + scheduler.allocate(attId1, update, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); // then node2 should get the container @@ -4429,7 +4429,7 @@ public void testSchedulingOnRemovedNode() throws Exception { createResourceRequest(1024, 8, ResourceRequest.ANY, 1, 1, true); ask1.add(request1); - scheduler.allocate(id11, ask1, new ArrayList(), null, + scheduler.allocate(id11, ask1, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); String hostName = "127.0.0.1"; @@ -4505,11 +4505,11 @@ public void testBlacklistNodes() throws Exception { // Verify the blacklist can be updated independent of requesting containers scheduler.allocate(appAttemptId, Collections.emptyList(), - Collections.emptyList(), + null, Collections.emptyList(), Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); assertTrue(app.isPlaceBlacklisted(host)); scheduler.allocate(appAttemptId, Collections.emptyList(), - Collections.emptyList(), null, + null, Collections.emptyList(), null, Collections.singletonList(host), NULL_UPDATE_REQUESTS); assertFalse(scheduler.getSchedulerApp(appAttemptId) .isPlaceBlacklisted(host)); @@ -4518,8 +4518,7 @@ public void testBlacklistNodes() throws Exception { createResourceRequest(GB, node.getHostName(), 1, 0, true)); // Verify a container does not actually get placed on the blacklisted host - scheduler.allocate(appAttemptId, update, - Collections.emptyList(), + scheduler.allocate(appAttemptId, update, null, Collections.emptyList(), Collections.singletonList(host), null, NULL_UPDATE_REQUESTS); assertTrue(app.isPlaceBlacklisted(host)); scheduler.update(); @@ -4528,8 +4527,7 @@ public void testBlacklistNodes() throws Exception { .getLiveContainers().size()); // Verify a container gets placed on the empty blacklist - scheduler.allocate(appAttemptId, update, - Collections.emptyList(), null, + scheduler.allocate(appAttemptId, update, null, Collections.emptyList(), null, Collections.singletonList(host), NULL_UPDATE_REQUESTS); assertFalse(app.isPlaceBlacklisted(host)); createSchedulingRequest(GB, "root.default", "user", 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index db749ac90c6..8814c0e542d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -281,7 +281,7 @@ public void testNodeLocalAssignment() throws Exception { ask.add(nodeLocal); ask.add(rackLocal); ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList(), + scheduler.allocate(appAttemptId, ask, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0); @@ -378,7 +378,7 @@ public void testUpdateResourceOnNode() throws Exception { ask.add(nodeLocal); ask.add(rackLocal); ask.add(any); - scheduler.allocate(appAttemptId, ask, new ArrayList(), + scheduler.allocate(appAttemptId, ask, null, new ArrayList(), null, null, NULL_UPDATE_REQUESTS); // Before the node update event, there are one local request @@ -954,7 +954,7 @@ public void testBlackListNodes() throws Exception { ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1, RMNodeLabelsManager.NO_LABEL)); - fs.allocate(appAttemptId1, ask1, emptyId, + fs.allocate(appAttemptId1, ask1, null, emptyId, Collections.singletonList(host_1_0), null, NULL_UPDATE_REQUESTS); // Trigger container assignment @@ -963,7 +963,7 @@ public void testBlackListNodes() throws Exception { // Get the allocation for the application and verify no allocation on // blacklist node Allocation allocation1 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation1", 0, allocation1.getContainers().size()); @@ -971,7 +971,7 @@ public void testBlackListNodes() throws Exception { // verify host_1_1 can get allocated as not in blacklist fs.handle(new NodeUpdateSchedulerEvent(n4)); Allocation allocation2 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation2", 1, allocation2.getContainers().size()); List containerList = allocation2.getContainers(); @@ -986,33 +986,33 @@ public void testBlackListNodes() throws Exception { // be assigned ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask2, emptyId, + fs.allocate(appAttemptId1, ask2, null, emptyId, Collections.singletonList("rack0"), null, NULL_UPDATE_REQUESTS); // verify n1 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n1)); Allocation allocation3 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation3", 0, allocation3.getContainers().size()); // verify n2 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n2)); Allocation allocation4 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation4", 0, allocation4.getContainers().size()); // verify n3 is not qualified to be allocated fs.handle(new NodeUpdateSchedulerEvent(n3)); Allocation allocation5 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation5", 0, allocation5.getContainers().size()); fs.handle(new NodeUpdateSchedulerEvent(n4)); Allocation allocation6 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("allocation6", 1, allocation6.getContainers().size()); @@ -1072,14 +1072,14 @@ public void testHeadroom() throws Exception { List ask1 = new ArrayList(); ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1)); - fs.allocate(appAttemptId1, ask1, emptyId, + fs.allocate(appAttemptId1, ask1, null, emptyId, null, null, NULL_UPDATE_REQUESTS); // Ask for a 2 GB container for app 2 List ask2 = new ArrayList(); ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1)); - fs.allocate(appAttemptId2, ask2, emptyId, + fs.allocate(appAttemptId2, ask2, null, emptyId, null, null, NULL_UPDATE_REQUESTS); // Trigger container assignment @@ -1087,13 +1087,13 @@ public void testHeadroom() throws Exception { // Get the allocation for the applications and verify headroom Allocation allocation1 = - fs.allocate(appAttemptId1, emptyAsk, emptyId, + fs.allocate(appAttemptId1, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("Allocation headroom", 1 * GB, allocation1 .getResourceLimit().getMemorySize()); Allocation allocation2 = - fs.allocate(appAttemptId2, emptyAsk, emptyId, + fs.allocate(appAttemptId2, emptyAsk, null, emptyId, null, null, NULL_UPDATE_REQUESTS); Assert.assertEquals("Allocation headroom", 1 * GB, allocation2 .getResourceLimit().getMemorySize());