diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index e46647a609c..2935e5f2740 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -585,14 +585,12 @@ public int hashCode() { Resource capability = getCapability(); String hostName = getResourceName(); Priority priority = getPriority(); - ProfileCapability profile = getProfileCapability(); result = prime * result + ((capability == null) ? 0 : capability.hashCode()); result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); result = prime * result + getNumContainers(); result = prime * result + ((priority == null) ? 0 : priority.hashCode()); result = prime * result + Long.valueOf(getAllocationRequestId()).hashCode(); - result = prime * result + ((profile == null) ? 0 : profile.hashCode()); return result; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index ef635d33b94..9ab6dbb1445 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -1075,10 +1075,17 @@ private void setAMResourceCapability(ApplicationSubmissionContext appContext, + " application master, exiting. " + "Specified virtual cores=" + amVCores); } - String tmp = amResourceProfile; - if (amResourceProfile.isEmpty()) { - tmp = "default"; + Resource capability = Resource.newInstance(0, 0); + + if (!amResourceProfile.isEmpty()) { + if (!profiles.containsKey(amResourceProfile)) { + throw new IllegalArgumentException( + "Failed to find specified resource profile for application master=" + + amResourceProfile); + } + capability = Resources.clone(profiles.get(amResourceProfile)); } + if (appContext.getAMContainerResourceRequests() == null) { List amResourceRequests = new ArrayList(); amResourceRequests @@ -1087,31 +1094,26 @@ private void setAMResourceCapability(ApplicationSubmissionContext appContext, appContext.setAMContainerResourceRequests(amResourceRequests); } - if (appContext.getAMContainerResourceRequests().get(0) - .getProfileCapability() == null) { - appContext.getAMContainerResourceRequests().get(0).setProfileCapability( - ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0))); - } - - Resource capability = Resource.newInstance(0, 0); - validateResourceTypes(amResources.keySet(), resourceTypes); for (Map.Entry entry : amResources.entrySet()) { capability.setResourceValue(entry.getKey(), entry.getValue()); } // set amMemory because it's used to set Xmx param if (amMemory == -1) { - amMemory = (profiles == null) ? DEFAULT_AM_MEMORY : - profiles.get(tmp).getMemorySize(); + amMemory = DEFAULT_AM_MEMORY; + LOG.warn("AM Memory not specified, use " + DEFAULT_AM_MEMORY + + " mb as AM memory"); } if (amVCores == -1) { - amVCores = (profiles == null) ? DEFAULT_AM_VCORES : - profiles.get(tmp).getVirtualCores(); + amVCores = DEFAULT_AM_VCORES; + LOG.warn("AM vcore not specified, use " + DEFAULT_AM_VCORES + + " mb as AM vcores"); } capability.setMemorySize(amMemory); capability.setVirtualCores(amVCores); - appContext.getAMContainerResourceRequests().get(0).getProfileCapability() - .setProfileCapabilityOverride(capability); + appContext.getAMContainerResourceRequests().get(0).setCapability( + capability); + LOG.warn("AM Resource capability=" + capability); } private void setContainerResources(Map profiles, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java index d3d1974a5d8..276f1f5637b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java @@ -23,6 +23,7 @@ import java.util.function.Supplier; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -119,7 +120,7 @@ protected AMRMClient(String name) { private String nodeLabelsExpression; private ExecutionTypeRequest executionTypeRequest = ExecutionTypeRequest.newInstance(); - private String resourceProfile = ProfileCapability.DEFAULT_PROFILE; + private String resourceProfile = null; /** * Instantiates a {@link ContainerRequest} with the given constraints and @@ -142,6 +143,13 @@ public ContainerRequest(Resource capability, String[] nodes, this(capability, nodes, racks, priority, true, null); } + @VisibleForTesting + public ContainerRequest(Resource capability, String[] nodes, String[] racks, + Priority priority, String profile) { + this(capability, nodes, racks, priority, 0, true, null, + ExecutionTypeRequest.newInstance(), profile); + } + /** * Instantiates a {@link ContainerRequest} with the given constraints and * locality relaxation enabled. @@ -166,27 +174,7 @@ public ContainerRequest(Resource capability, String[] nodes, this(capability, nodes, racks, priority, allocationRequestId, true, null, ExecutionTypeRequest.newInstance()); } - /** - * Instantiates a {@link ContainerRequest} with the given constraints and - * locality relaxation enabled. - * - * @param capability - * The {@link ProfileCapability} to be requested for each container. - * @param nodes - * Any hosts to request that the containers are placed on. - * @param racks - * Any racks to request that the containers are placed on. The - * racks corresponding to any hosts requested will be automatically - * added to this list. - * @param priority - * The priority at which to request the containers. Higher - * priorities have lower numerical values. - */ - public ContainerRequest(ProfileCapability capability, String[] nodes, - String[] racks, Priority priority) { - this(capability, nodes, racks, priority, 0, true, null); - } - + /** * Instantiates a {@link ContainerRequest} with the given constraints. * @@ -214,29 +202,6 @@ public ContainerRequest(Resource capability, String[] nodes, * Instantiates a {@link ContainerRequest} with the given constraints. * * @param capability - * The {@link ProfileCapability} to be requested for each container. - * @param nodes - * Any hosts to request that the containers are placed on. - * @param racks - * Any racks to request that the containers are placed on. The - * racks corresponding to any hosts requested will be automatically - * added to this list. - * @param priority - * The priority at which to request the containers. Higher - * priorities have lower numerical values. - * @param relaxLocality - * If true, containers for this request may be assigned on hosts - * and racks other than the ones explicitly requested. - */ - public ContainerRequest(ProfileCapability capability, String[] nodes, - String[] racks, Priority priority, boolean relaxLocality) { - this(capability, nodes, racks, priority, 0, relaxLocality, null); - } - - /** - * Instantiates a {@link ContainerRequest} with the given constraints. - * - * @param capability * The {@link Resource} to be requested for each container. * @param nodes * Any hosts to request that the containers are placed on. @@ -324,14 +289,6 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, ExecutionTypeRequest.newInstance()); } - public ContainerRequest(ProfileCapability capability, String[] nodes, - String[] racks, Priority priority, long allocationRequestId, - boolean relaxLocality, String nodeLabelsExpression) { - this(capability, nodes, racks, priority, allocationRequestId, - relaxLocality, nodeLabelsExpression, - ExecutionTypeRequest.newInstance()); - } - /** * Instantiates a {@link ContainerRequest} with the given constraints. * @@ -365,9 +322,10 @@ public ContainerRequest(Resource capability, String[] nodes, String[] racks, ExecutionTypeRequest executionTypeRequest) { this(capability, nodes, racks, priority, allocationRequestId, relaxLocality, nodeLabelsExpression, executionTypeRequest, - ProfileCapability.DEFAULT_PROFILE); + null); } + // TODO, remove this one. public ContainerRequest(ProfileCapability capability, String[] nodes, String[] racks, Priority priority, long allocationRequestId, boolean relaxLocality, String nodeLabelsExpression, @@ -376,7 +334,7 @@ public ContainerRequest(ProfileCapability capability, String[] nodes, allocationRequestId, relaxLocality, nodeLabelsExpression, executionTypeRequest, capability.getProfileName()); } - + /** * Instantiates a {@link ContainerRequest} with the given constraints. * @@ -743,7 +701,7 @@ public abstract void requestContainerUpdate( @InterfaceStability.Evolving public List> getMatchingRequests( Priority priority, String resourceName, ExecutionType executionType, - ProfileCapability capability) { + Resource capability, String profile) { throw new UnsupportedOperationException("The sub-class extending" + " AMRMClient is expected to implement this !!"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 5507c07fc11..b630e906c6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; @@ -114,14 +115,11 @@ LinkedHashSet containerRequests; ResourceRequestInfo(Long allocationRequestId, Priority priority, - String resourceName, Resource capability, boolean relaxLocality, - String resourceProfile) { - ProfileCapability profileCapability = ProfileCapability - .newInstance(resourceProfile, capability); + String resourceName, Resource capability, boolean relaxLocality) { remoteRequest = ResourceRequest.newBuilder().priority(priority) .resourceName(resourceName).capability(capability).numContainers(0) .allocationRequestId(allocationRequestId).relaxLocality(relaxLocality) - .profileCapability(profileCapability).build(); + .build(); containerRequests = new LinkedHashSet(); } } @@ -130,32 +128,11 @@ * Class compares Resource by memory, then cpu and then the remaining resource * types in reverse order. */ - static class ProfileCapabilityComparator + static class ResourceReverseComparator implements Comparator { - - HashMap resourceProfilesMap; - - public ProfileCapabilityComparator( - HashMap resourceProfileMap) { - this.resourceProfilesMap = resourceProfileMap; + public int compare(Resource res0, Resource res1) { + return res1.compareTo(res0); } - - public int compare(T arg0, T arg1) { - Resource resource0 = - ProfileCapability.toResource(arg0, resourceProfilesMap); - Resource resource1 = - ProfileCapability.toResource(arg1, resourceProfilesMap); - return resource1.compareTo(resource0); - } - } - - boolean canFit(ProfileCapability arg0, ProfileCapability arg1) { - Resource resource0 = - ProfileCapability.toResource(arg0, resourceProfilesMap); - Resource resource1 = - ProfileCapability.toResource(arg1, resourceProfilesMap); - return Resources.fitsIn(resource0, resource1); - } private final Map> remoteRequests = @@ -509,8 +486,6 @@ public void unregisterApplicationMaster(FinalApplicationStatus appStatus, public synchronized void addContainerRequest(T req) { Preconditions.checkArgument(req != null, "Resource request can not be null."); - ProfileCapability profileCapability = ProfileCapability - .newInstance(req.getResourceProfile(), req.getCapability()); Set dedupedRacks = new HashSet(); if (req.getRacks() != null) { dedupedRacks.addAll(req.getRacks()); @@ -523,7 +498,8 @@ public synchronized void addContainerRequest(T req) { Set inferredRacks = resolveRacks(req.getNodes()); inferredRacks.removeAll(dedupedRacks); - checkResourceProfile(req.getResourceProfile()); + Resource resource = checkAndGetResourceProfile(req.getResourceProfile(), + req.getCapability()); // check that specific and non-specific requests cannot be mixed within a // priority @@ -549,26 +525,26 @@ public synchronized void addContainerRequest(T req) { } for (String node : dedupedNodes) { addResourceRequest(req.getPriority(), node, - req.getExecutionTypeRequest(), profileCapability, req, true, + req.getExecutionTypeRequest(), resource, req, true, req.getNodeLabelExpression()); } } for (String rack : dedupedRacks) { addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), - profileCapability, req, true, req.getNodeLabelExpression()); + resource, req, true, req.getNodeLabelExpression()); } // Ensure node requests are accompanied by requests for // corresponding rack for (String rack : inferredRacks) { addResourceRequest(req.getPriority(), rack, req.getExecutionTypeRequest(), - profileCapability, req, req.getRelaxLocality(), + resource, req, req.getRelaxLocality(), req.getNodeLabelExpression()); } // Off-switch addResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getExecutionTypeRequest(), profileCapability, req, + req.getExecutionTypeRequest(), resource, req, req.getRelaxLocality(), req.getNodeLabelExpression()); } @@ -576,8 +552,8 @@ public synchronized void addContainerRequest(T req) { public synchronized void removeContainerRequest(T req) { Preconditions.checkArgument(req != null, "Resource request can not be null."); - ProfileCapability profileCapability = ProfileCapability - .newInstance(req.getResourceProfile(), req.getCapability()); + Resource resource = checkAndGetResourceProfile(req.getResourceProfile(), + req.getCapability()); Set allRacks = new HashSet(); if (req.getRacks() != null) { allRacks.addAll(req.getRacks()); @@ -588,17 +564,17 @@ public synchronized void removeContainerRequest(T req) { if (req.getNodes() != null) { for (String node : new HashSet(req.getNodes())) { decResourceRequest(req.getPriority(), node, - req.getExecutionTypeRequest(), profileCapability, req); + req.getExecutionTypeRequest(), resource, req); } } for (String rack : allRacks) { decResourceRequest(req.getPriority(), rack, - req.getExecutionTypeRequest(), profileCapability, req); + req.getExecutionTypeRequest(), resource, req); } decResourceRequest(req.getPriority(), ResourceRequest.ANY, - req.getExecutionTypeRequest(), profileCapability, req); + req.getExecutionTypeRequest(), resource, req); } @Override @@ -693,26 +669,23 @@ public synchronized int getClusterNodeCount() { } @Override - @SuppressWarnings("unchecked") - public synchronized List> getMatchingRequests( - Priority priority, String resourceName, ExecutionType executionType, - Resource capability) { - ProfileCapability profileCapability = - ProfileCapability.newInstance(capability); - return getMatchingRequests(priority, resourceName, executionType, - profileCapability); + public List> getMatchingRequests(Priority priority, + String resourceName, ExecutionType executionType, + Resource capability, String profile) { + capability = checkAndGetResourceProfile(profile, capability); + return getMatchingRequests(priority, resourceName, executionType, capability); } @Override @SuppressWarnings("unchecked") public synchronized List> getMatchingRequests( Priority priority, String resourceName, ExecutionType executionType, - ProfileCapability capability) { + Resource capability) { Preconditions.checkArgument(capability != null, "The Resource to be requested should not be null "); Preconditions.checkArgument(priority != null, "The priority at which to request containers should not be null "); - List> list = new LinkedList>(); + List> list = new LinkedList<>(); RemoteRequestsTable remoteRequestsTable = getTable(0); @@ -724,7 +697,7 @@ public synchronized int getClusterNodeCount() { // If no exact match. Container may be larger than what was requested. // get all resources <= capability. map is reverse sorted. for (ResourceRequestInfo resReqInfo : matchingRequests) { - if (canFit(resReqInfo.remoteRequest.getProfileCapability(), + if (Resources.fitsIn(resReqInfo.remoteRequest.getCapability(), capability) && !resReqInfo.containerRequests.isEmpty()) { list.add(resReqInfo.containerRequests); } @@ -781,13 +754,31 @@ private void checkLocalityRelaxationConflict(Long allocationReqId, } } - private void checkResourceProfile(String profile) { - if (resourceProfilesMap != null && !resourceProfilesMap.isEmpty() - && !resourceProfilesMap.containsKey(profile)) { - throw new InvalidContainerRequestException( - "Invalid profile name, valid profile names are " + resourceProfilesMap - .keySet()); + // When profile and override resource are specified at the same time, override + // predefined resource value in profile if any resource type has a positive + // value. + private Resource checkAndGetResourceProfile(String profile, + Resource overrideResource) { + Resource returnResource = overrideResource; + + // if application requested a non-empty/null profile, and the + if (profile != null && !profile.isEmpty()) { + if (resourceProfilesMap == null || (!resourceProfilesMap.containsKey( + profile))) { + throw new InvalidContainerRequestException( + "Invalid profile name specified=" + profile + + ", valid profile names are " + resourceProfilesMap.keySet()); + } + returnResource = Resources.clone(resourceProfilesMap.get(profile)); + for (ResourceInformation info : overrideResource + .getAllResourcesListCopy()) { + if (info.getValue() > 0) { + returnResource.setResourceInformation(info.getName(), info); + } + } } + + return returnResource; } /** @@ -876,16 +867,12 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) { } private void addResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req, + ExecutionTypeRequest execTypeReq, Resource capability, T req, boolean relaxLocality, String labelExpression) { RemoteRequestsTable remoteRequestsTable = getTable(req.getAllocationRequestId()); if (remoteRequestsTable == null) { - remoteRequestsTable = new RemoteRequestsTable(); - if (this.resourceProfilesMap instanceof HashMap) { - remoteRequestsTable.setResourceComparator( - new ProfileCapabilityComparator((HashMap) resourceProfilesMap)); - } + remoteRequestsTable = new RemoteRequestsTable<>(); putTable(req.getAllocationRequestId(), remoteRequestsTable); } @SuppressWarnings("unchecked") @@ -908,7 +895,7 @@ private void addResourceRequest(Priority priority, String resourceName, } private void decResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) { + ExecutionTypeRequest execTypeReq, Resource capability, T req) { RemoteRequestsTable remoteRequestsTable = getTable(req.getAllocationRequestId()); if (remoteRequestsTable != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java index 55271328176..e1b7bb27f45 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/RemoteRequestsTable.java @@ -21,7 +21,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ProfileCapability; +import org.apache.hadoop.yarn.api.records.Resource; import java.util.Collection; import java.util.HashMap; @@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ResourceRequestInfo; -import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.ProfileCapabilityComparator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,36 +41,34 @@ private static final Logger LOG = LoggerFactory.getLogger(RemoteRequestsTable.class); - private ProfileCapabilityComparator resourceComparator; - /** * Nested Iterator that iterates over just the ResourceRequestInfo * object. */ class RequestInfoIterator implements Iterator { - private Iterator>>> iLocMap; - private Iterator>> iExecTypeMap; - private Iterator> iCapMap; + private Iterator> iCapMap; private Iterator iResReqInfo; public RequestInfoIterator(Iterator>>> + Map>>> iLocationMap) { this.iLocMap = iLocationMap; if (iLocMap.hasNext()) { iExecTypeMap = iLocMap.next().values().iterator(); } else { iExecTypeMap = - new LinkedList>>().iterator(); } if (iExecTypeMap.hasNext()) { iCapMap = iExecTypeMap.next().values().iterator(); } else { iCapMap = - new LinkedList>() + new LinkedList>() .iterator(); } if (iCapMap.hasNext()) { @@ -113,7 +110,7 @@ public void remove() { // Nest map with Primary key : // Priority -> ResourceName(String) -> ExecutionType -> Capability(Resource) // and value : ResourceRequestInfo - private Map>>> remoteRequestsTable = new HashMap<>(); @Override @@ -122,8 +119,8 @@ public void remove() { } ResourceRequestInfo get(Priority priority, String location, - ExecutionType execType, ProfileCapability capability) { - TreeMap capabilityMap = + ExecutionType execType, Resource capability) { + TreeMap capabilityMap = getCapabilityMap(priority, location, execType); if (capabilityMap == null) { return null; @@ -133,8 +130,8 @@ ResourceRequestInfo get(Priority priority, String location, @SuppressWarnings("unchecked") void put(Priority priority, String resourceName, ExecutionType execType, - ProfileCapability capability, ResourceRequestInfo resReqInfo) { - Map>> locationMap = remoteRequestsTable.get(priority); if (locationMap == null) { @@ -144,7 +141,7 @@ void put(Priority priority, String resourceName, ExecutionType execType, LOG.debug("Added priority=" + priority); } } - Map> + Map> execTypeMap = locationMap.get(resourceName); if (execTypeMap == null) { execTypeMap = new HashMap<>(); @@ -153,15 +150,10 @@ void put(Priority priority, String resourceName, ExecutionType execType, LOG.debug("Added resourceName=" + resourceName); } } - TreeMap capabilityMap = + TreeMap capabilityMap = execTypeMap.get(execType); if (capabilityMap == null) { - // this can happen if the user doesn't register with the RM before - // calling addResourceRequest - if (resourceComparator == null) { - resourceComparator = new ProfileCapabilityComparator(new HashMap<>()); - } - capabilityMap = new TreeMap<>(resourceComparator); + capabilityMap = new TreeMap<>(new AMRMClientImpl.ResourceReverseComparator()); execTypeMap.put(execType, capabilityMap); if (LOG.isDebugEnabled()) { LOG.debug("Added Execution Type=" + execType); @@ -171,9 +163,9 @@ void put(Priority priority, String resourceName, ExecutionType execType, } ResourceRequestInfo remove(Priority priority, String resourceName, - ExecutionType execType, ProfileCapability capability) { + ExecutionType execType, Resource capability) { ResourceRequestInfo retVal = null; - Map>> locationMap = remoteRequestsTable.get(priority); if (locationMap == null) { if (LOG.isDebugEnabled()) { @@ -181,7 +173,7 @@ ResourceRequestInfo remove(Priority priority, String resourceName, } return null; } - Map> + Map> execTypeMap = locationMap.get(resourceName); if (execTypeMap == null) { if (LOG.isDebugEnabled()) { @@ -189,7 +181,7 @@ ResourceRequestInfo remove(Priority priority, String resourceName, } return null; } - TreeMap capabilityMap = + TreeMap capabilityMap = execTypeMap.get(execType); if (capabilityMap == null) { if (LOG.isDebugEnabled()) { @@ -210,14 +202,14 @@ ResourceRequestInfo remove(Priority priority, String resourceName, return retVal; } - Map>> getLocationMap(Priority priority) { return remoteRequestsTable.get(priority); } - Map> + Map> getExecutionTypeMap(Priority priority, String location) { - Map>> locationMap = getLocationMap(priority); if (locationMap == null) { return null; @@ -225,10 +217,10 @@ ResourceRequestInfo remove(Priority priority, String resourceName, return locationMap.get(location); } - TreeMap getCapabilityMap(Priority + TreeMap getCapabilityMap(Priority priority, String location, ExecutionType execType) { - Map> + Map> executionTypeMap = getExecutionTypeMap(priority, location); if (executionTypeMap == null) { return null; @@ -242,7 +234,7 @@ ResourceRequestInfo remove(Priority priority, String resourceName, List retList = new LinkedList<>(); for (String location : locations) { for (ExecutionType eType : ExecutionType.values()) { - TreeMap capabilityMap = + TreeMap capabilityMap = getCapabilityMap(priority, location, eType); if (capabilityMap != null) { retList.addAll(capabilityMap.values()); @@ -254,9 +246,9 @@ ResourceRequestInfo remove(Priority priority, String resourceName, List getMatchingRequests( Priority priority, String resourceName, ExecutionType executionType, - ProfileCapability capability) { + Resource capability) { List list = new LinkedList<>(); - TreeMap capabilityMap = + TreeMap capabilityMap = getCapabilityMap(priority, resourceName, executionType); if (capabilityMap != null) { ResourceRequestInfo resourceRequestInfo = capabilityMap.get(capability); @@ -272,15 +264,14 @@ ResourceRequestInfo remove(Priority priority, String resourceName, @SuppressWarnings("unchecked") ResourceRequestInfo addResourceRequest(Long allocationRequestId, Priority priority, String resourceName, ExecutionTypeRequest execTypeReq, - ProfileCapability capability, T req, boolean relaxLocality, + Resource capability, T req, boolean relaxLocality, String labelExpression) { ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, execTypeReq.getExecutionType(), capability); if (resourceRequestInfo == null) { resourceRequestInfo = new ResourceRequestInfo(allocationRequestId, priority, resourceName, - capability.getProfileCapabilityOverride(), relaxLocality, - capability.getProfileName()); + capability, relaxLocality); put(priority, resourceName, execTypeReq.getExecutionType(), capability, resourceRequestInfo); } @@ -302,7 +293,7 @@ ResourceRequestInfo addResourceRequest(Long allocationRequestId, } ResourceRequestInfo decResourceRequest(Priority priority, String resourceName, - ExecutionTypeRequest execTypeReq, ProfileCapability capability, T req) { + ExecutionTypeRequest execTypeReq, Resource capability, T req) { ResourceRequestInfo resourceRequestInfo = get(priority, resourceName, execTypeReq.getExecutionType(), capability); @@ -339,35 +330,4 @@ ResourceRequestInfo decResourceRequest(Priority priority, String resourceName, boolean isEmpty() { return remoteRequestsTable.isEmpty(); } - - @SuppressWarnings("unchecked") - public void setResourceComparator(ProfileCapabilityComparator comparator) { - ProfileCapabilityComparator old = this.resourceComparator; - this.resourceComparator = comparator; - if (old != null) { - // we've already set a resource comparator - re-create the maps with the - // new one. this is needed in case someone adds container requests before - // registering with the RM. In such a case, the comparator won't have - // the resource profiles map. After registration, the map is available - // so re-create the capabilities maps - - for (Map.Entry>>> - priEntry : remoteRequestsTable.entrySet()) { - for (Map.Entry>> nameEntry : priEntry.getValue().entrySet()) { - for (Map.Entry> execEntry : nameEntry - .getValue().entrySet()) { - Map capabilityMap = - execEntry.getValue(); - TreeMap newCapabiltyMap = - new TreeMap<>(resourceComparator); - newCapabiltyMap.putAll(capabilityMap); - execEntry.setValue(newCapabiltyMap); - } - } - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 3ecc5cdf5b7..e9fdb210679 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -618,19 +618,16 @@ public void testAMRMClientMatchStorage() throws YarnException, IOException { amClient.addContainerRequest(storedContainer1); amClient.addContainerRequest(storedContainer2); amClient.addContainerRequest(storedContainer3); - - ProfileCapability profileCapability = - ProfileCapability.newInstance(capability); // test addition and storage RemoteRequestsTable remoteRequestsTable = amClient.getTable(0); int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); assertEquals(2, containersRequestedAny); containersRequestedAny = remoteRequestsTable.get(priority1, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); assertEquals(1, containersRequestedAny); List> matches = @@ -1333,11 +1330,9 @@ public void testAMRMClientWithContainerPromotion() true, null, ExecutionTypeRequest .newInstance(ExecutionType.OPPORTUNISTIC, true))); - ProfileCapability profileCapability = - ProfileCapability.newInstance(capability); int oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest + ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); assertEquals(1, oppContainersRequestedAny); @@ -1474,11 +1469,9 @@ public void testAMRMClientWithContainerDemotion() true, null, ExecutionTypeRequest .newInstance(ExecutionType.GUARANTEED, true))); - ProfileCapability profileCapability = - ProfileCapability.newInstance(capability); int oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.GUARANTEED, profileCapability).remoteRequest + ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); assertEquals(1, oppContainersRequestedAny); @@ -1858,16 +1851,14 @@ private void assertNumContainers(AMRMClientImpl amClient, int expAsks, int expRelease) { RemoteRequestsTable remoteRequestsTable = amClient.getTable(allocationReqId); - ProfileCapability profileCapability = - ProfileCapability.newInstance(capability); int containersRequestedNode = remoteRequestsTable.get(priority, - node, ExecutionType.GUARANTEED, profileCapability).remoteRequest + node, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); int containersRequestedRack = remoteRequestsTable.get(priority, - rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest + rack, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); int containersRequestedAny = remoteRequestsTable.get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); assertEquals(expNode, containersRequestedNode); @@ -2079,31 +2070,20 @@ public void testGetMatchingFitWithProfiles() throws Exception { amClient.start(); amClient.registerApplicationMaster("Host", 10000, ""); - ProfileCapability capability1 = ProfileCapability.newInstance("minimum"); - ProfileCapability capability2 = ProfileCapability.newInstance("default"); - ProfileCapability capability3 = ProfileCapability.newInstance("maximum"); - ProfileCapability capability4 = ProfileCapability - .newInstance("minimum", Resource.newInstance(2048, 1)); - ProfileCapability capability5 = ProfileCapability.newInstance("default"); - ProfileCapability capability6 = ProfileCapability - .newInstance("default", Resource.newInstance(2048, 1)); - // http has the same capabilities as default - ProfileCapability capability7 = ProfileCapability.newInstance("http"); - - ContainerRequest storedContainer1 = - new ContainerRequest(capability1, nodes, racks, priority); - ContainerRequest storedContainer2 = - new ContainerRequest(capability2, nodes, racks, priority); - ContainerRequest storedContainer3 = - new ContainerRequest(capability3, nodes, racks, priority); - ContainerRequest storedContainer4 = - new ContainerRequest(capability4, nodes, racks, priority); - ContainerRequest storedContainer5 = - new ContainerRequest(capability5, nodes, racks, priority2); - ContainerRequest storedContainer6 = - new ContainerRequest(capability6, nodes, racks, priority); - ContainerRequest storedContainer7 = - new ContainerRequest(capability7, nodes, racks, priority); + ContainerRequest storedContainer1 = new ContainerRequest( + Resource.newInstance(0, 0), nodes, racks, priority, "minimum"); + ContainerRequest storedContainer2 = new ContainerRequest( + Resource.newInstance(0, 0), nodes, racks, priority, "default"); + ContainerRequest storedContainer3 = new ContainerRequest( + Resource.newInstance(0, 0), nodes, racks, priority, "maximum"); + ContainerRequest storedContainer4 = new ContainerRequest( + Resource.newInstance(2048, 1), nodes, racks, priority, "minimum"); + ContainerRequest storedContainer5 = new ContainerRequest( + Resource.newInstance(2048, 1), nodes, racks, priority2, "default"); + ContainerRequest storedContainer6 = new ContainerRequest( + Resource.newInstance(2048, 1), nodes, racks, priority, "default"); + ContainerRequest storedContainer7 = new ContainerRequest( + Resource.newInstance(0, 0), nodes, racks, priority, "http"); amClient.addContainerRequest(storedContainer1); @@ -2118,11 +2098,8 @@ public void testGetMatchingFitWithProfiles() throws Exception { List> matches; ContainerRequest storedRequest; // exact match - ProfileCapability testCapability1 = - ProfileCapability.newInstance("minimum"); - matches = amClient - .getMatchingRequests(priority, node, ExecutionType.GUARANTEED, - testCapability1); + matches = amClient.getMatchingRequests(priority, node, + ExecutionType.GUARANTEED, Resource.newInstance(0, 0), "minimum"); verifyMatches(matches, 1); storedRequest = matches.get(0).iterator().next(); assertEquals(storedContainer1, storedRequest); @@ -2131,11 +2108,9 @@ public void testGetMatchingFitWithProfiles() throws Exception { // exact matching with order maintained // we should get back 3 matches - default + http because they have the // same capability - ProfileCapability testCapability2 = - ProfileCapability.newInstance("default"); matches = amClient .getMatchingRequests(priority, node, ExecutionType.GUARANTEED, - testCapability2); + Resource.newInstance(0, 0), "default"); verifyMatches(matches, 2); // must be returned in the order they were made int i = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java index c87123ad38a..e9a3caa18bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java @@ -277,10 +277,8 @@ private void verifyResourceRequest( AMRMClientImpl client, ContainerRequest request, String location, boolean expectedRelaxLocality, ExecutionType executionType) { - ProfileCapability profileCapability = ProfileCapability - .newInstance(request.getResourceProfile(), request.getCapability()); ResourceRequest ask = client.getTable(0).get(request.getPriority(), - location, executionType, profileCapability).remoteRequest; + location, executionType, request.getCapability()).remoteRequest; assertEquals(location, ask.getResourceName()); assertEquals(1, ask.getNumContainers()); assertEquals(expectedRelaxLocality, ask.getRelaxLocality()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java index 23e557279df..eb2ecb96cf2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java @@ -18,22 +18,6 @@ package org.apache.hadoop.yarn.client.api.impl; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; @@ -58,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -84,6 +67,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class TestNMClient { Configuration conf = null; MiniYARNCluster yarnCluster = null; @@ -332,11 +331,9 @@ public void testNMClient() racks, priority)); } - ProfileCapability profileCapability = - ProfileCapability.newInstance(capability); int containersRequestedAny = rmClient.getTable(0) .get(priority, ResourceRequest.ANY, ExecutionType.GUARANTEED, - profileCapability).remoteRequest.getNumContainers(); + capability).remoteRequest.getNumContainers(); // RM should allocate container within 2 calls to allocate() int allocatedContainerCount = 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java index 94cb28e1fa6..9b4040eeeef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestOpportunisticContainerAllocationE2E.java @@ -100,7 +100,6 @@ private static final long AM_EXPIRE_MS = 4000; private static Resource capability; - private static ProfileCapability profileCapability; private static Priority priority; private static Priority priority2; private static Priority priority3; @@ -153,7 +152,6 @@ public static void setup() throws Exception { priority3 = Priority.newInstance(3); priority4 = Priority.newInstance(4); capability = Resource.newInstance(512, 1); - profileCapability = ProfileCapability.newInstance(capability); node = nodeReports.get(0).getNodeId().getHost(); rack = nodeReports.get(0).getRackName(); @@ -276,7 +274,7 @@ public void testPromotionFromAcquired() throws YarnException, IOException { int oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest + ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); assertEquals(1, oppContainersRequestedAny); @@ -397,7 +395,7 @@ public void testDemotionFromAcquired() throws YarnException, IOException { new AMRMClient.ContainerRequest(capability, null, null, priority3)); int guarContainersRequestedAny = amClient.getTable(0).get(priority3, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); assertEquals(1, guarContainersRequestedAny); @@ -536,17 +534,17 @@ public void testMixedAllocationAndRelease() throws YarnException, ExecutionType.OPPORTUNISTIC, true))); int containersRequestedNode = amClient.getTable(0).get(priority, - node, ExecutionType.GUARANTEED, profileCapability).remoteRequest + node, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); int containersRequestedRack = amClient.getTable(0).get(priority, - rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest + rack, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); int containersRequestedAny = amClient.getTable(0).get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); int oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest + ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); assertEquals(4, containersRequestedNode); @@ -568,17 +566,17 @@ public void testMixedAllocationAndRelease() throws YarnException, ExecutionType.OPPORTUNISTIC, true))); containersRequestedNode = amClient.getTable(0).get(priority, - node, ExecutionType.GUARANTEED, profileCapability).remoteRequest + node, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); containersRequestedRack = amClient.getTable(0).get(priority, - rack, ExecutionType.GUARANTEED, profileCapability).remoteRequest + rack, ExecutionType.GUARANTEED, capability).remoteRequest .getNumContainers(); containersRequestedAny = amClient.getTable(0).get(priority, - ResourceRequest.ANY, ExecutionType.GUARANTEED, profileCapability) + ResourceRequest.ANY, ExecutionType.GUARANTEED, capability) .remoteRequest.getNumContainers(); oppContainersRequestedAny = amClient.getTable(0).get(priority2, ResourceRequest.ANY, - ExecutionType.OPPORTUNISTIC, profileCapability).remoteRequest + ExecutionType.OPPORTUNISTIC, capability).remoteRequest .getNumContainers(); assertEquals(2, containersRequestedNode); @@ -697,7 +695,7 @@ public void testOpportunisticAllocation() throws YarnException, IOException { int oppContainersRequestedAny = amClient.getTable(0) .get(priority3, ResourceRequest.ANY, ExecutionType.OPPORTUNISTIC, - profileCapability).remoteRequest.getNumContainers(); + capability).remoteRequest.getNumContainers(); assertEquals(2, oppContainersRequestedAny);