diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 43058240707..622bbaa60c0 100755 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -122,6 +122,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -162,6 +163,7 @@ private final org.apache.hadoop.mapred.JobID oldJobId; private final TaskAttemptListener taskAttemptListener; private final Resource resourceCapability; + private String resourceProfile = ProfileCapability.DEFAULT_PROFILE; protected Set dataLocalHosts; protected Set dataLocalRacks; private final List diagnostics = new ArrayList(); @@ -670,6 +672,8 @@ public TaskAttemptImpl(TaskId taskId, int i, getMemoryRequired(conf, taskId.getTaskType())); this.resourceCapability.setVirtualCores( getCpuRequired(conf, taskId.getTaskType())); + this.resourceProfile = getResourceProfile(conf, taskId.getTaskType()); + LOG.info("Using resource profile '" + this.resourceProfile + "' for " + taskId); this.dataLocalHosts = resolveHosts(dataLocalHosts); RackResolver.init(conf); @@ -705,6 +709,19 @@ private int getCpuRequired(Configuration conf, TaskType taskType) { return vcores; } + private String getResourceProfile(JobConf conf, TaskType taskType) { + String ret = ProfileCapability.DEFAULT_PROFILE; + if (taskType == TaskType.MAP) { + ret = conf.get(MRJobConfig.MAP_RESOURCE_PROFILE, + ProfileCapability.DEFAULT_PROFILE); + } else if (taskType == TaskType.REDUCE) { + ret = conf.get(MRJobConfig.REDUCE_RESOURCE_PROFILE, + ProfileCapability.DEFAULT_PROFILE); + } + + return ret; + } + /** * Create a {@link LocalResource} record with all the given parameters. */ @@ -1628,14 +1645,15 @@ public void transition(TaskAttemptImpl taskAttempt, taskAttempt.eventHandler.handle( ContainerRequestEvent.createContainerRequestEventForFailedContainer( taskAttempt.attemptId, - taskAttempt.resourceCapability)); + taskAttempt.resourceCapability, taskAttempt.resourceProfile)); } else { taskAttempt.eventHandler.handle(new ContainerRequestEvent( taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt.dataLocalHosts.toArray( new String[taskAttempt.dataLocalHosts.size()]), taskAttempt.dataLocalRacks.toArray( - new String[taskAttempt.dataLocalRacks.size()]))); + new String[taskAttempt.dataLocalRacks.size()]), + taskAttempt.resourceProfile)); } } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java index 215a2b35c20..b806f2fc878 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java @@ -18,7 +18,9 @@ package org.apache.hadoop.mapreduce.v2.app.rm; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; @@ -28,26 +30,41 @@ private final String[] hosts; private final String[] racks; private boolean earlierAttemptFailed = false; + private String resourceProfile; - public ContainerRequestEvent(TaskAttemptId attemptID, - Resource capability, - String[] hosts, String[] racks) { + public ContainerRequestEvent(TaskAttemptId attemptID, Resource capability, + String[] hosts, String[] racks, String resourceProfile) { super(attemptID, ContainerAllocator.EventType.CONTAINER_REQ); this.capability = capability; this.hosts = hosts; this.racks = racks; + this.resourceProfile = resourceProfile; + } + + public ContainerRequestEvent(TaskAttemptId attemptID, Resource capability, + String[] hosts, String[] racks) { + this(attemptID, capability, hosts, racks, + ProfileCapability.DEFAULT_PROFILE); } ContainerRequestEvent(TaskAttemptId attemptID, Resource capability) { this(attemptID, capability, new String[0], new String[0]); this.earlierAttemptFailed = true; } - + public static ContainerRequestEvent createContainerRequestEventForFailedContainer( - TaskAttemptId attemptID, - Resource capability) { + TaskAttemptId attemptID, Resource capability) { + return createContainerRequestEventForFailedContainer(attemptID, capability, + ProfileCapability.DEFAULT_PROFILE); + } + + public static ContainerRequestEvent createContainerRequestEventForFailedContainer( + TaskAttemptId attemptID, Resource capability, String resourceProfile) { //ContainerRequest for failed events does not consider rack / node locality? - return new ContainerRequestEvent(attemptID, capability); + ContainerRequestEvent event = + new ContainerRequestEvent(attemptID, capability); + event.setResourceProfile(resourceProfile); + return event; } public Resource getCapability() { @@ -65,4 +82,12 @@ public Resource getCapability() { public boolean getEarlierAttemptFailed() { return earlierAttemptFailed; } + + public String getResourceProfile() { + return resourceProfile; + } + + public void setResourceProfile(String resourceProfile) { + this.resourceProfile = resourceProfile; + } } \ No newline at end of file diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 6cec2f3abfc..3b21cd5d7bd 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.EnumSet; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -80,6 +81,7 @@ protected Map applicationACLs; private volatile long lastHeartbeatTime; private ConcurrentLinkedQueue heartbeatCallbacks; + private Map resourceProfilesMap = new HashMap<>(); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -172,6 +174,7 @@ protected void register() { LOG.info("queue: " + queue); job.setQueueName(queue); this.schedulerResourceTypes.addAll(response.getSchedulerResourceTypes()); + resourceProfilesMap = response.getResourceProfiles(); } catch (Exception are) { LOG.error("Exception while registering", are); throw new YarnRuntimeException(are); @@ -355,4 +358,8 @@ protected boolean isApplicationMasterRegistered() { public EnumSet getSchedulerResourceTypes() { return schedulerResourceTypes; } + + public Map getResourceProfilesMap() { + return resourceProfilesMap; + } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 1f88a2c70e3..95c2afefd0b 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.ClientRMProxy; @@ -90,6 +91,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; /** * Allocates the container from the ResourceManager scheduler. @@ -427,7 +429,17 @@ private void handleReduceContainerRequest(ContainerRequestEvent reqEvent) { JobId jobId = getJob().getID(); if (reduceResourceRequest.equals(Resources.none())) { - reduceResourceRequest = reqEvent.getCapability(); + if (!getResourceProfilesMap().isEmpty() && !getResourceProfilesMap() + .containsKey(reqEvent.getResourceProfile())) { + throw new YarnRuntimeException( + "Unknown resource profile type '" + reqEvent.getResourceProfile() + + "'. The resource profiles are " + getResourceProfilesMap() + .keySet()); + } + ProfileCapability profileCapability = ProfileCapability + .newInstance(reqEvent.getResourceProfile(), reqEvent.getCapability()); + reduceResourceRequest = ProfileCapability + .toResource(profileCapability, getResourceProfilesMap()); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, @@ -480,10 +492,19 @@ private void handleMapContainerRequest(ContainerRequestEvent reqEvent) { JobId jobId = getJob().getID(); if (mapResourceRequest.equals(Resources.none())) { - mapResourceRequest = reqEvent.getCapability(); + if (!getResourceProfilesMap().isEmpty() && !getResourceProfilesMap() + .containsKey(reqEvent.getResourceProfile())) { + throw new YarnRuntimeException( + "Unknown resource profile type '" + reqEvent.getResourceProfile() + + "'. The resource profiles are " + getResourceProfilesMap() + .keySet()); + } + ProfileCapability profileCapability = ProfileCapability + .newInstance(reqEvent.getResourceProfile(), reqEvent.getCapability()); + mapResourceRequest = ProfileCapability + .toResource(profileCapability, getResourceProfilesMap()); eventHandler.handle(new JobHistoryEvent(jobId, - new NormalizedResourceEvent( - org.apache.hadoop.mapreduce.TaskType.MAP, + new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest.getMemorySize()))); LOG.info("mapResourceRequest:" + mapResourceRequest); } @@ -1115,7 +1136,8 @@ void addMap(ContainerRequestEvent event) { new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP, mapNodeLabelExpression); maps.put(event.getAttemptID(), request); - addOpportunisticResourceRequest(request.priority, request.capability); + addOpportunisticResourceRequest(request.priority, request.capability, + request.resourceProfile); } else { request = new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression); diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index f4579abd476..796f9f69b36 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -125,6 +126,7 @@ public RMContainerRequestor(ClientService clientService, AppContext context) { //final boolean earlierAttemptFailed; final Priority priority; final String nodeLabelExpression; + final String resourceProfile; /** * the time when this request object was formed; can be used to avoid @@ -135,25 +137,28 @@ public RMContainerRequestor(ClientService clientService, AppContext context) { public ContainerRequest(ContainerRequestEvent event, Priority priority, String nodeLabelExpression) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), - event.getRacks(), priority, nodeLabelExpression); + event.getRacks(), priority, nodeLabelExpression, + event.getResourceProfile()); } public ContainerRequest(ContainerRequestEvent event, Priority priority, long requestTimeMs) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), - event.getRacks(), priority, requestTimeMs,null); + event.getRacks(), priority, requestTimeMs, null, + event.getResourceProfile()); } - public ContainerRequest(TaskAttemptId attemptID, - Resource capability, String[] hosts, String[] racks, - Priority priority, String nodeLabelExpression) { + public ContainerRequest(TaskAttemptId attemptID, Resource capability, + String[] hosts, String[] racks, Priority priority, + String nodeLabelExpression, String resourceProfile) { this(attemptID, capability, hosts, racks, priority, - System.currentTimeMillis(), nodeLabelExpression); + System.currentTimeMillis(), nodeLabelExpression, resourceProfile); } public ContainerRequest(TaskAttemptId attemptID, Resource capability, String[] hosts, String[] racks, - Priority priority, long requestTimeMs,String nodeLabelExpression) { + Priority priority, long requestTimeMs,String nodeLabelExpression, + String resourceProfile) { this.attemptID = attemptID; this.capability = capability; this.hosts = hosts; @@ -161,6 +166,7 @@ public ContainerRequest(TaskAttemptId attemptID, this.priority = priority; this.requestTimeMs = requestTimeMs; this.nodeLabelExpression = nodeLabelExpression; + this.resourceProfile = resourceProfile; } public String toString() { @@ -168,6 +174,11 @@ public String toString() { sb.append("AttemptId[").append(attemptID).append("]"); sb.append("Capability[").append(capability).append("]"); sb.append("Priority[").append(priority).append("]"); + sb.append("Hosts[").append(hosts).append("]"); + sb.append("Racks[").append(racks).append("]"); + sb.append("RequestTimeMs[").append(requestTimeMs).append("]"); + sb.append("NodeLabelExpression[").append(nodeLabelExpression).append("]"); + sb.append("ResourceProfile[").append(resourceProfile).append("]"); return sb.toString(); } } @@ -398,49 +409,49 @@ protected void addContainerReq(ContainerRequest req) { // Data-local if (!isNodeBlacklisted(host)) { addResourceRequest(req.priority, host, req.capability, - null); + null, req.resourceProfile); } } // Nothing Rack-local for now for (String rack : req.racks) { addResourceRequest(req.priority, rack, req.capability, - null); + null, req.resourceProfile); } // Off-switch addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, - req.nodeLabelExpression); + req.nodeLabelExpression, req.resourceProfile); } protected void decContainerReq(ContainerRequest req) { // Update resource requests for (String hostName : req.hosts) { - decResourceRequest(req.priority, hostName, req.capability); + decResourceRequest(req.priority, hostName, req.capability, req.resourceProfile); } for (String rack : req.racks) { - decResourceRequest(req.priority, rack, req.capability); + decResourceRequest(req.priority, rack, req.capability, req.resourceProfile); } - decResourceRequest(req.priority, ResourceRequest.ANY, req.capability); + decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.resourceProfile); } protected void addOpportunisticResourceRequest(Priority priority, - Resource capability) { + Resource capability, String profile) { addResourceRequest(priority, ResourceRequest.ANY, capability, null, - ExecutionType.OPPORTUNISTIC); + ExecutionType.OPPORTUNISTIC, profile); } private void addResourceRequest(Priority priority, String resourceName, - Resource capability, String nodeLabelExpression) { + Resource capability, String nodeLabelExpression, String profile) { addResourceRequest(priority, resourceName, capability, nodeLabelExpression, - ExecutionType.GUARANTEED); + ExecutionType.GUARANTEED, profile); } private void addResourceRequest(Priority priority, String resourceName, Resource capability, String nodeLabelExpression, - ExecutionType executionType) { + ExecutionType executionType, String profile) { Map> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { @@ -450,22 +461,27 @@ private void addResourceRequest(Priority priority, String resourceName, LOG.debug("Added priority=" + priority); } } + ProfileCapability profileCapability = + ProfileCapability.newInstance(profile, capability); + Resource reqCapability = ProfileCapability + .toResource(profileCapability, getResourceProfilesMap()); Map reqMap = remoteRequests.get(resourceName); if (reqMap == null) { reqMap = new HashMap(); remoteRequests.put(resourceName, reqMap); } - ResourceRequest remoteRequest = reqMap.get(capability); + ResourceRequest remoteRequest = reqMap.get(reqCapability); if (remoteRequest == null) { remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class); remoteRequest.setPriority(priority); remoteRequest.setResourceName(resourceName); - remoteRequest.setCapability(capability); + remoteRequest.setProfileCapability(profileCapability); + remoteRequest.setCapability(reqCapability); remoteRequest.setNumContainers(0); remoteRequest.setNodeLabelExpression(nodeLabelExpression); remoteRequest.setExecutionTypeRequest( ExecutionTypeRequest.newInstance(executionType, true)); - reqMap.put(capability, remoteRequest); + reqMap.put(reqCapability, remoteRequest); } remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); @@ -480,7 +496,11 @@ private void addResourceRequest(Priority priority, String resourceName, } private void decResourceRequest(Priority priority, String resourceName, - Resource capability) { + Resource capability, String resourceProfile) { + + Resource resource = ProfileCapability + .toResource(ProfileCapability.newInstance(resourceProfile, capability), + getResourceProfilesMap()); Map> remoteRequests = this.remoteRequestsTable.get(priority); Map reqMap = remoteRequests.get(resourceName); @@ -494,7 +514,8 @@ private void decResourceRequest(Priority priority, String resourceName, } return; } - ResourceRequest remoteRequest = reqMap.get(capability); + + ResourceRequest remoteRequest = reqMap.get(resource); if (LOG.isDebugEnabled()) { LOG.debug("BEFORE decResourceRequest:" + " applicationId=" @@ -510,7 +531,7 @@ private void decResourceRequest(Priority priority, String resourceName, } if (remoteRequest.getNumContainers() == 0) { - reqMap.remove(capability); + reqMap.remove(resource); if (reqMap.size() == 0) { remoteRequests.remove(resourceName); } @@ -558,8 +579,9 @@ protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) { } } String[] hosts = newHosts.toArray(new String[newHosts.size()]); - ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability, - hosts, orig.racks, orig.priority, orig.nodeLabelExpression); + ContainerRequest newReq = + new ContainerRequest(orig.attemptID, orig.capability, hosts, orig.racks, + orig.priority, orig.nodeLabelExpression, orig.resourceProfile); return newReq; } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 0a4d222f916..70192b8eb6b 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -292,6 +292,9 @@ public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores"; public static final int DEFAULT_MAP_CPU_VCORES = 1; + public static final String MAP_RESOURCE_PROFILE = + "mapreduce.map.resource_profile"; + public static final String MAP_ENV = "mapreduce.map.env"; public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts"; @@ -340,6 +343,10 @@ public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores"; public static final int DEFAULT_REDUCE_CPU_VCORES = 1; + public static final String REDUCE_RESOURCE_PROFILE = + "mapreduce.reduce.resource_profile"; + public static final String DEFAULT_REDUCE_RESOURCE_PROFILE = ""; + public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes"; public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java index 1a8d1c3a5b4..198e013180b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java @@ -45,11 +45,13 @@ */ @InterfaceAudience.Public @InterfaceStability.Unstable -public abstract class ProfileCapability { +public abstract class ProfileCapability implements Comparable { public static final String DEFAULT_PROFILE = "default"; public static ProfileCapability newInstance(Resource override) { + Preconditions + .checkArgument(override != null, "Override resource can't be null"); return newInstance(DEFAULT_PROFILE, override); } @@ -66,6 +68,8 @@ public static ProfileCapability newInstance(String profile, Resource override) { Preconditions .checkArgument(profile != null, "The profile name cannot be null"); + Preconditions + .checkArgument(override != null, "Override resource can't be null"); ProfileCapability obj = Records.newRecord(ProfileCapability.class); obj.setProfileName(profile); obj.setProfileCapabilityOverride(override); @@ -153,6 +157,9 @@ public static Resource toResource(ProfileCapability capability, Resource none = Resource.newInstance(0, 0); Resource resource = Resource.newInstance(0, 0); String profileName = capability.getProfileName(); + if (resourceProfilesMap.isEmpty()) { + return capability.getProfileCapabilityOverride(); + } if (profileName.isEmpty()) { profileName = DEFAULT_PROFILE; } @@ -164,11 +171,24 @@ public static Resource toResource(ProfileCapability capability, !capability.getProfileCapabilityOverride().equals(none)) { for (Map.Entry entry : capability .getProfileCapabilityOverride().getResources().entrySet()) { - if (entry.getValue() != null && entry.getValue().getValue() >= 0) { - resource.setResourceInformation(entry.getKey(), entry.getValue()); + if (entry.getKey().equals(ResourceInformation.MEMORY_MB.getName()) + || entry.getKey().equals(ResourceInformation.VCORES.getName())) { + if (entry.getValue() != null && entry.getValue().getValue() > 0) { + resource.setResourceInformation(entry.getKey(), entry.getValue()); + } } } } return resource; } + + @Override + public int compareTo(ProfileCapability other) { + int ret = this.getProfileName().compareTo(other.getProfileName()); + if (ret == 0) { + ret = this.getProfileCapabilityOverride() + .compareTo(other.getProfileCapabilityOverride()); + } + return ret; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 103bcfde868..4b085eb4f23 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -21,15 +21,18 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; import java.util.Map; +import java.util.Set; /** *

Resource models a set of computer resources in the @@ -76,8 +79,35 @@ public static Resource newInstance(long memory, int vCores) { return resource; } - @InterfaceAudience.Private - @InterfaceStability.Unstable + /** + * Create a new Resource object with the values specified for memory and + * vcores. Use the supplied default value for every other resource type. + * @param memory value for memory + * @param vCores value for vcores + * @param defaultValue default value for every other resource type + * @return + */ + @Public + @Unstable + public static Resource newInstance(long memory, int vCores, + long defaultValue) { + Resource ret = Records.newRecord(Resource.class); + Set resources = ret.getResources().keySet(); + for (String resource : resources) { + try { + ret.getResourceInformation(resource).setValue(defaultValue); + } catch (YarnException ye) { + ret.setResourceInformation(resource, + ResourceInformation.newInstance(resource, defaultValue)); + } + } + ret.setMemorySize(memory); + ret.setVirtualCores(vCores); + return ret; + } + + @Private + @Unstable public static Resource newInstance(Resource resource) { Resource ret = Resource.newInstance(0, 0); for (Map.Entry entry : resource.getResources() diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index c1339b00b13..e0a15424f1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -636,6 +636,9 @@ public boolean equals(Object obj) { return false; } } + if (!getProfileCapability().equals(other.getProfileCapability())) { + return false; + } return true; } @@ -655,8 +658,14 @@ public int compareTo(ResourceRequest other) { int numContainerComparison = this.getNumContainers() - other.getNumContainers(); if (numContainerComparison == 0) { - return Long.compare(getAllocationRequestId(), - other.getAllocationRequestId()); + int profileComparison = this.getProfileCapability() + .compareTo(other.getProfileCapability()); + if (profileComparison == 0) { + return Long.compare(getAllocationRequestId(), + other.getAllocationRequestId()); + } else { + return profileComparison; + } } else { return numContainerComparison; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index 7f9c0b83579..93867a96b92 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.UpdatedContainer; @@ -1481,7 +1482,8 @@ private ProfileCapability createProfileCapability() } Resource resourceCapability = - Resource.newInstance(containerMemory, containerVirtualCores); + Resource.newInstance(containerMemory, containerVirtualCores, -1); + if (resourceProfiles == null) { containerMemory = containerMemory == -1 ? DEFAULT_CONTAINER_MEMORY : containerMemory; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 0c6d2d3a418..4d59048c339 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -1005,7 +1005,7 @@ private void setAMResourceCapability(ApplicationSubmissionContext appContext, appContext.getAMContainerResourceRequest().setProfileCapability( ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0))); } - Resource capability = Resource.newInstance(0, 0); + Resource capability = Resource.newInstance(0, 0, -1); // set amMemory because it's used to set Xmx param if (profiles == null) { amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 539d6b27ffc..3e6127a7476 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -612,6 +612,12 @@ public static void convertProfileToResourceCapability(ResourceRequest ask, } } else { if (ask.getProfileCapability() != null) { + if (resourceProfilesManager + .getProfile(ask.getProfileCapability().getProfileName()) == null) { + throw new YarnException( + "Unknown resource profile '" + ask.getProfileCapability() + .getProfileName() + "'."); + } ask.setCapability(ProfileCapability .toResource(ask.getProfileCapability(), resourceProfilesManager.getResourceProfiles())); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java index 8839bf908bf..d666d89027b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ResourceProfilesManagerImpl.java @@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.codehaus.jackson.map.ObjectMapper; @@ -146,6 +148,9 @@ private Resource parseResource(String key, Map value) throws IOException { } public Resource getProfile(String profile) { + if (!profiles.containsKey(profile)) { + return null; + } return Resources.clone(profiles.get(profile)); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceProfiles.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceProfiles.java index c542ed8853f..afa87fed897 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceProfiles.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceProfiles.java @@ -18,12 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.resource; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Assert; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -52,12 +60,9 @@ public void testProfilesEnabled() throws Exception { @Test public void testLoadProfiles() throws Exception { - ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); - Configuration conf = new Configuration(); - conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); - conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE, - "profiles/sample-profiles-1.json"); - manager.init(conf); + ResourceProfilesManager manager = + createAndInitResourceProfilesManager("profiles/sample-profiles-1.json"); + Map profiles = manager.getResourceProfiles(); Map expected = new HashMap<>(); expected.put("minimum", Resource.newInstance(1024, 1)); @@ -96,12 +101,8 @@ public void testLoadProfilesMissingMandatoryProfile() throws Exception { @Test public void testGetProfile() throws Exception { - Configuration conf = new Configuration(); - conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); - ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); - conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE, - "profiles/sample-profiles-2.json"); - manager.init(conf); + ResourceProfilesManager manager = + createAndInitResourceProfilesManager("profiles/sample-profiles-2.json"); Map expected = new HashMap<>(); expected.put("minimum", Resource.newInstance(1024, 1)); expected.put("default", Resource.newInstance(2048, 2)); @@ -116,16 +117,15 @@ public void testGetProfile() throws Exception { Assert.assertEquals("Profile " + profile + "' resources don't match", res, manager.getProfile(profile)); } + + Assert.assertTrue("Return value for unknown profile should be null", + manager.getProfile("random") == null); } @Test public void testGetMandatoryProfiles() throws Exception { - ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); - Configuration conf = new Configuration(); - conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); - conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE, - "profiles/sample-profiles-1.json"); - manager.init(conf); + ResourceProfilesManager manager = + createAndInitResourceProfilesManager("profiles/sample-profiles-1.json"); Map expected = new HashMap<>(); expected.put("minimum", Resource.newInstance(1024, 1)); expected.put("default", Resource.newInstance(2048, 2)); @@ -139,4 +139,71 @@ public void testGetMandatoryProfiles() throws Exception { expected.get("maximum"), manager.getMaximumProfile()); } + + @Test + public void testConvertProfileToResourceCapability() throws Exception { + File dest = null; + try { + Configuration conf = + createConfiguration("profiles/sample-profiles-3.json"); + File source = new File( + conf.getClassLoader().getResource("resource-types-2.xml").getFile()); + dest = new File(source.getParent(), "resource-types.xml"); + FileUtils.copyFile(source, dest); + ResourceUtils.resetResourceTypes(conf); + ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); + manager.init(conf); + Map expectedResultsMap = new HashMap<>(); + expectedResultsMap.put( + ProfileCapability.newInstance("small", Resource.newInstance(3072, 1)), + Resource.newInstance(3072, 1, 4)); + expectedResultsMap.put( + ProfileCapability.newInstance("small", Resource.newInstance(1024, 2)), + Resource.newInstance(1024, 2, 4)); + expectedResultsMap.put( + ProfileCapability.newInstance("small", Resource.newInstance(0, 0)), + Resource.newInstance(1024, 1, 4)); + + // override the extra resource, but it shouldn't apply + expectedResultsMap.put(ProfileCapability + .newInstance("small", Resource.newInstance(3072, 1, 2)), + Resource.newInstance(3072, 1, 4)); + + for (Map.Entry entry : expectedResultsMap + .entrySet()) { + ResourceRequest ask = ResourceRequest + .newInstance(Priority.UNDEFINED, "test", Resource.newInstance(0, 0), + 4, false, "*", ExecutionTypeRequest.newInstance(), + entry.getKey()); + RMServerUtils.convertProfileToResourceCapability(ask, conf, manager); + String message = + "Incorrect conversion, expected '" + entry.getValue() + "'; got '" + + ask.getCapability(); + Assert.assertEquals(message, entry.getValue(), ask.getCapability()); + } + } finally { + if (dest != null) { + dest.delete(); + ResourceUtils.resetResourceTypes(null); + } + + } + + } + + private Configuration createConfiguration(String resourceProfilesFilename) { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); + conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE, + resourceProfilesFilename); + return conf; + } + + private ResourceProfilesManager createAndInitResourceProfilesManager( + String resourceProfilesFilename) throws Exception { + ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); + Configuration conf = createConfiguration(resourceProfilesFilename); + manager.init(conf); + return manager; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/profiles/sample-profiles-3.json hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/profiles/sample-profiles-3.json new file mode 100644 index 00000000000..8b94a70b3e5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/profiles/sample-profiles-3.json @@ -0,0 +1,32 @@ +{ + "minimum": { + "memory-mb" : 1024, + "vcores" : 1, + "resource1" : 0 + }, + "default" : { + "memory-mb" : 2048, + "vcores" : 2, + "resource1" : 0 + }, + "maximum" : { + "memory-mb": 4096, + "vcores" : 4, + "resource1" : "4G" + }, + "small" : { + "memory-mb": 1024, + "vcores": 1, + "resource1" : "4G" + }, + "medium" : { + "memory-mb": 2048, + "vcores": 1, + "resource1" : 0 + }, + "large": { + "memory-mb" : 4096, + "vcores" : 4, + "resource1" : "2G" + } +}