diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 3faad480b9d..91d04de2685 100755 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -122,6 +122,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -162,18 +163,19 @@ private final org.apache.hadoop.mapred.JobID oldJobId; private final TaskAttemptListener taskAttemptListener; private final Resource resourceCapability; + private final String resourceProfile; protected Set dataLocalHosts; protected Set dataLocalRacks; private final List diagnostics = new ArrayList(); private final Lock readLock; private final Lock writeLock; private final AppContext appContext; - private Credentials credentials; - private Token jobToken; - private static AtomicBoolean initialClasspathFlag = new AtomicBoolean(); + private final Credentials credentials; + private final Token jobToken; + private static final AtomicBoolean initialClasspathFlag = new AtomicBoolean(); private static String initialClasspath = null; private static String initialAppClasspath = null; - private static Object commonContainerSpecLock = new Object(); + private static final Object commonContainerSpecLock = new Object(); private static ContainerLaunchContext commonContainerSpec = null; private static final Object classpathLock = new Object(); private long launchTime; @@ -670,6 +672,7 @@ public TaskAttemptImpl(TaskId taskId, int i, getMemoryRequired(conf, taskId.getTaskType())); this.resourceCapability.setVirtualCores( getCpuRequired(conf, taskId.getTaskType())); + this.resourceProfile = getResourceProfile(conf, taskId.getTaskType()); this.dataLocalHosts = resolveHosts(dataLocalHosts); RackResolver.init(conf); @@ -705,6 +708,23 @@ private int getCpuRequired(Configuration conf, TaskType taskType) { return vcores; } + private String getResourceProfile(JobConf conf, TaskType taskType) { + String ret = ProfileCapability.DEFAULT_PROFILE; + + if (taskType == TaskType.MAP) { + ret = conf.get(MRJobConfig.MAP_RESOURCE_PROFILE, + ProfileCapability.DEFAULT_PROFILE); + } else if (taskType == TaskType.REDUCE) { + ret = conf.get(MRJobConfig.REDUCE_RESOURCE_PROFILE, + ProfileCapability.DEFAULT_PROFILE); + } + + LOG.info("Using resource profile '" + ret + "' for " + + attemptId.getTaskId()); + + return ret; + } + /** * Create a {@link LocalResource} record with all the given parameters. */ @@ -1649,10 +1669,11 @@ public void transition(TaskAttemptImpl taskAttempt, taskAttempt.eventHandler.handle( ContainerRequestEvent.createContainerRequestEventForFailedContainer( taskAttempt.attemptId, - taskAttempt.resourceCapability)); + taskAttempt.resourceCapability, taskAttempt.resourceProfile)); } else { taskAttempt.eventHandler.handle(new ContainerRequestEvent( taskAttempt.attemptId, taskAttempt.resourceCapability, + taskAttempt.resourceProfile, taskAttempt.dataLocalHosts.toArray( new String[taskAttempt.dataLocalHosts.size()]), taskAttempt.dataLocalRacks.toArray( diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java index 215a2b35c20..fc611e31a28 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; @@ -28,26 +29,34 @@ private final String[] hosts; private final String[] racks; private boolean earlierAttemptFailed = false; + private String resourceProfile; - public ContainerRequestEvent(TaskAttemptId attemptID, - Resource capability, - String[] hosts, String[] racks) { + public ContainerRequestEvent(TaskAttemptId attemptID, Resource capability, + String resourceProfile, String[] hosts, String[] racks) { super(attemptID, ContainerAllocator.EventType.CONTAINER_REQ); this.capability = capability; this.hosts = hosts; this.racks = racks; + this.resourceProfile = resourceProfile; + } + + public ContainerRequestEvent(TaskAttemptId attemptID, Resource capability, + String[] hosts, String[] racks) { + this(attemptID, capability, ProfileCapability.DEFAULT_PROFILE, + hosts, racks); } - ContainerRequestEvent(TaskAttemptId attemptID, Resource capability) { + ContainerRequestEvent(TaskAttemptId attemptID, Resource capability, + String profile) { this(attemptID, capability, new String[0], new String[0]); this.earlierAttemptFailed = true; + this.resourceProfile = profile; } - + public static ContainerRequestEvent createContainerRequestEventForFailedContainer( - TaskAttemptId attemptID, - Resource capability) { + TaskAttemptId attemptID, Resource capability, String resourceProfile) { //ContainerRequest for failed events does not consider rack / node locality? - return new ContainerRequestEvent(attemptID, capability); + return new ContainerRequestEvent(attemptID, capability, resourceProfile); } public Resource getCapability() { @@ -65,4 +74,13 @@ public Resource getCapability() { public boolean getEarlierAttemptFailed() { return earlierAttemptFailed; } + + /** + * Get the requested resource profile. + * + * @return the requested resource profile + */ + public String getResourceProfile() { + return resourceProfile; + } } \ No newline at end of file diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index a7058e05846..7af290be469 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -22,6 +22,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.EnumSet; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -80,6 +81,7 @@ protected Map applicationACLs; private volatile long lastHeartbeatTime; private ConcurrentLinkedQueue heartbeatCallbacks; + private Map resourceProfilesMap = new HashMap<>(); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -172,6 +174,7 @@ protected void register() { LOG.info("queue: " + queue); job.setQueueName(queue); this.schedulerResourceTypes.addAll(response.getSchedulerResourceTypes()); + resourceProfilesMap = response.getResourceProfiles(); } catch (Exception are) { LOG.error("Exception while registering", are); throw new YarnRuntimeException(are); @@ -353,4 +356,14 @@ protected boolean isApplicationMasterRegistered() { public EnumSet getSchedulerResourceTypes() { return schedulerResourceTypes; } + + /** + * Get a map of resource profile names to the resources that the profiles + * represent. + * + * @return the map of resource profiles + */ + public Map getResourceProfilesMap() { + return resourceProfilesMap; + } } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 0dc7642418c..2e384212c09 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.ClientRMProxy; @@ -427,7 +428,7 @@ private void handleReduceContainerRequest(ContainerRequestEvent reqEvent) { JobId jobId = getJob().getID(); if (reduceResourceRequest.equals(Resources.none())) { - reduceResourceRequest = reqEvent.getCapability(); + reduceResourceRequest = getResourceProfile(reqEvent); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, @@ -471,6 +472,24 @@ private void handleReduceContainerRequest(ContainerRequestEvent reqEvent) { } } + private Resource getResourceProfile(ContainerRequestEvent reqEvent) + throws YarnRuntimeException { + if (!getResourceProfilesMap().isEmpty() && + !getResourceProfilesMap().containsKey(reqEvent.getResourceProfile())) { + throw new YarnRuntimeException( + "Unknown resource profile: '" + reqEvent.getResourceProfile() + + "'. The resource profiles are " + getResourceProfilesMap() + .keySet()); + } + + ProfileCapability profileCapability = + ProfileCapability.newInstance(reqEvent.getResourceProfile(), + reqEvent.getCapability()); + + return ProfileCapability.toResource(profileCapability, + getResourceProfilesMap()); + } + @SuppressWarnings({ "unchecked" }) private void handleMapContainerRequest(ContainerRequestEvent reqEvent) { assert(reqEvent.getAttemptID().getTaskId().getTaskType().equals( @@ -480,10 +499,9 @@ private void handleMapContainerRequest(ContainerRequestEvent reqEvent) { JobId jobId = getJob().getID(); if (mapResourceRequest.equals(Resources.none())) { - mapResourceRequest = reqEvent.getCapability(); + mapResourceRequest = getResourceProfile(reqEvent); eventHandler.handle(new JobHistoryEvent(jobId, - new NormalizedResourceEvent( - org.apache.hadoop.mapreduce.TaskType.MAP, + new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest.getMemorySize()))); LOG.info("mapResourceRequest:" + mapResourceRequest); } @@ -1115,7 +1133,8 @@ void addMap(ContainerRequestEvent event) { new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP, mapNodeLabelExpression); maps.put(event.getAttemptID(), request); - addOpportunisticResourceRequest(request.priority, request.capability); + addOpportunisticResourceRequest(request.priority, request.capability, + request.resourceProfile); } else { request = new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression); diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index f4579abd476..3d12a5226b9 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -125,6 +126,7 @@ public RMContainerRequestor(ClientService clientService, AppContext context) { //final boolean earlierAttemptFailed; final Priority priority; final String nodeLabelExpression; + final String resourceProfile; /** * the time when this request object was formed; can be used to avoid @@ -134,26 +136,29 @@ public RMContainerRequestor(ClientService clientService, AppContext context) { public ContainerRequest(ContainerRequestEvent event, Priority priority, String nodeLabelExpression) { - this(event.getAttemptID(), event.getCapability(), event.getHosts(), - event.getRacks(), priority, nodeLabelExpression); + this(event.getAttemptID(), event.getCapability(), + event.getResourceProfile(), event.getHosts(), event.getRacks(), + priority, nodeLabelExpression); } public ContainerRequest(ContainerRequestEvent event, Priority priority, long requestTimeMs) { - this(event.getAttemptID(), event.getCapability(), event.getHosts(), - event.getRacks(), priority, requestTimeMs,null); + this(event.getAttemptID(), event.getCapability(), + event.getResourceProfile(), event.getHosts(), event.getRacks(), + priority, requestTimeMs, null); } - public ContainerRequest(TaskAttemptId attemptID, - Resource capability, String[] hosts, String[] racks, - Priority priority, String nodeLabelExpression) { - this(attemptID, capability, hosts, racks, priority, + public ContainerRequest(TaskAttemptId attemptID, Resource capability, + String resourceProfile, String[] hosts, String[] racks, + Priority priority, String nodeLabelExpression) { + this(attemptID, capability, resourceProfile, hosts, racks, priority, System.currentTimeMillis(), nodeLabelExpression); } public ContainerRequest(TaskAttemptId attemptID, - Resource capability, String[] hosts, String[] racks, - Priority priority, long requestTimeMs,String nodeLabelExpression) { + Resource capability, String resourceProfile, String[] hosts, + String[] racks, Priority priority, long requestTimeMs, + String nodeLabelExpression) { this.attemptID = attemptID; this.capability = capability; this.hosts = hosts; @@ -161,6 +166,7 @@ public ContainerRequest(TaskAttemptId attemptID, this.priority = priority; this.requestTimeMs = requestTimeMs; this.nodeLabelExpression = nodeLabelExpression; + this.resourceProfile = resourceProfile; } public String toString() { @@ -168,6 +174,11 @@ public String toString() { sb.append("AttemptId[").append(attemptID).append("]"); sb.append("Capability[").append(capability).append("]"); sb.append("Priority[").append(priority).append("]"); + sb.append("Hosts[").append(hosts).append("]"); + sb.append("Racks[").append(racks).append("]"); + sb.append("RequestTimeMs[").append(requestTimeMs).append("]"); + sb.append("NodeLabelExpression[").append(nodeLabelExpression).append("]"); + sb.append("ResourceProfile[").append(resourceProfile).append("]"); return sb.toString(); } } @@ -398,49 +409,49 @@ protected void addContainerReq(ContainerRequest req) { // Data-local if (!isNodeBlacklisted(host)) { addResourceRequest(req.priority, host, req.capability, - null); + null, req.resourceProfile); } } // Nothing Rack-local for now for (String rack : req.racks) { addResourceRequest(req.priority, rack, req.capability, - null); + null, req.resourceProfile); } // Off-switch addResourceRequest(req.priority, ResourceRequest.ANY, req.capability, - req.nodeLabelExpression); + req.nodeLabelExpression, req.resourceProfile); } protected void decContainerReq(ContainerRequest req) { // Update resource requests for (String hostName : req.hosts) { - decResourceRequest(req.priority, hostName, req.capability); + decResourceRequest(req.priority, hostName, req.capability, req.resourceProfile); } for (String rack : req.racks) { - decResourceRequest(req.priority, rack, req.capability); + decResourceRequest(req.priority, rack, req.capability, req.resourceProfile); } - decResourceRequest(req.priority, ResourceRequest.ANY, req.capability); + decResourceRequest(req.priority, ResourceRequest.ANY, req.capability, req.resourceProfile); } protected void addOpportunisticResourceRequest(Priority priority, - Resource capability) { + Resource capability, String profile) { addResourceRequest(priority, ResourceRequest.ANY, capability, null, - ExecutionType.OPPORTUNISTIC); + ExecutionType.OPPORTUNISTIC, profile); } private void addResourceRequest(Priority priority, String resourceName, - Resource capability, String nodeLabelExpression) { + Resource capability, String nodeLabelExpression, String profile) { addResourceRequest(priority, resourceName, capability, nodeLabelExpression, - ExecutionType.GUARANTEED); + ExecutionType.GUARANTEED, profile); } private void addResourceRequest(Priority priority, String resourceName, Resource capability, String nodeLabelExpression, - ExecutionType executionType) { + ExecutionType executionType, String profile) { Map> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { @@ -450,22 +461,27 @@ private void addResourceRequest(Priority priority, String resourceName, LOG.debug("Added priority=" + priority); } } + ProfileCapability profileCapability = + ProfileCapability.newInstance(profile, capability); + Resource reqCapability = ProfileCapability + .toResource(profileCapability, getResourceProfilesMap()); Map reqMap = remoteRequests.get(resourceName); if (reqMap == null) { reqMap = new HashMap(); remoteRequests.put(resourceName, reqMap); } - ResourceRequest remoteRequest = reqMap.get(capability); + ResourceRequest remoteRequest = reqMap.get(reqCapability); if (remoteRequest == null) { remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class); remoteRequest.setPriority(priority); remoteRequest.setResourceName(resourceName); - remoteRequest.setCapability(capability); + remoteRequest.setProfileCapability(profileCapability); + remoteRequest.setCapability(reqCapability); remoteRequest.setNumContainers(0); remoteRequest.setNodeLabelExpression(nodeLabelExpression); remoteRequest.setExecutionTypeRequest( ExecutionTypeRequest.newInstance(executionType, true)); - reqMap.put(capability, remoteRequest); + reqMap.put(reqCapability, remoteRequest); } remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); @@ -480,7 +496,11 @@ private void addResourceRequest(Priority priority, String resourceName, } private void decResourceRequest(Priority priority, String resourceName, - Resource capability) { + Resource capability, String resourceProfile) { + + Resource resource = ProfileCapability + .toResource(ProfileCapability.newInstance(resourceProfile, capability), + getResourceProfilesMap()); Map> remoteRequests = this.remoteRequestsTable.get(priority); Map reqMap = remoteRequests.get(resourceName); @@ -494,7 +514,8 @@ private void decResourceRequest(Priority priority, String resourceName, } return; } - ResourceRequest remoteRequest = reqMap.get(capability); + + ResourceRequest remoteRequest = reqMap.get(resource); if (LOG.isDebugEnabled()) { LOG.debug("BEFORE decResourceRequest:" + " applicationId=" @@ -510,7 +531,7 @@ private void decResourceRequest(Priority priority, String resourceName, } if (remoteRequest.getNumContainers() == 0) { - reqMap.remove(capability); + reqMap.remove(resource); if (reqMap.size() == 0) { remoteRequests.remove(resourceName); } @@ -558,8 +579,10 @@ protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) { } } String[] hosts = newHosts.toArray(new String[newHosts.size()]); - ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability, - hosts, orig.racks, orig.priority, orig.nodeLabelExpression); + ContainerRequest newReq = + new ContainerRequest(orig.attemptID, orig.capability, + orig.resourceProfile, hosts, orig.racks, orig.priority, + orig.nodeLabelExpression); return newReq; } diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index e4a8a1a90b9..34cb4dfb085 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -153,6 +153,7 @@ import org.junit.Test; import com.google.common.base.Supplier; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.mockito.InOrder; @SuppressWarnings("unchecked") @@ -1878,7 +1879,7 @@ private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId, if (earlierFailedAttempt) { return ContainerRequestEvent .createContainerRequestEventForFailedContainer(attemptId, - containerNeed); + containerNeed, ProfileCapability.DEFAULT_PROFILE); } return new ContainerRequestEvent(attemptId, containerNeed, hosts, new String[] { NetworkTopology.DEFAULT_RACK }); diff --git hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 2023ba3b1d2..baea6d531e0 100644 --- hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -298,6 +298,9 @@ public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores"; public static final int DEFAULT_MAP_CPU_VCORES = 1; + public static final String MAP_RESOURCE_PROFILE = + "mapreduce.map.resource_profile"; + public static final String MAP_ENV = "mapreduce.map.env"; public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts"; @@ -352,6 +355,9 @@ public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores"; public static final int DEFAULT_REDUCE_CPU_VCORES = 1; + public static final String REDUCE_RESOURCE_PROFILE = + "mapreduce.reduce.resource_profile"; + public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes"; public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java index 2cb46704716..39c10469e72 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ProfileCapability.java @@ -45,11 +45,13 @@ */ @InterfaceAudience.Public @InterfaceStability.Unstable -public abstract class ProfileCapability { - +public abstract class ProfileCapability implements Comparable { + public static final Resource NONE = Resource.newInstance(0L, 0); public static final String DEFAULT_PROFILE = "default"; public static ProfileCapability newInstance(Resource override) { + Preconditions + .checkArgument(override != null, "Override resource can't be null"); return newInstance(DEFAULT_PROFILE, override); } @@ -66,6 +68,8 @@ public static ProfileCapability newInstance(String profile, Resource override) { Preconditions .checkArgument(profile != null, "The profile name cannot be null"); + Preconditions + .checkArgument(override != null, "Override resource can't be null"); ProfileCapability obj = Records.newRecord(ProfileCapability.class); obj.setProfileName(profile); obj.setProfileCapabilityOverride(override); @@ -140,35 +144,56 @@ public String toString() { /** * Get a representation of the capability as a Resource object. + * {@code capability} and {@code resourceProfilesMap} must not be null. + * If {@code resourceProfilesMap} is empty, {@code capability} will be + * returned. If {@code profileName} is empty, the default profile will be + * used. + * * @param capability the capability we wish to convert * @param resourceProfilesMap map of profile name to Resource object * @return Resource object representing the capability */ public static Resource toResource(ProfileCapability capability, Map resourceProfilesMap) { - Preconditions - .checkArgument(capability != null, "Capability cannot be null"); + Preconditions.checkArgument(capability != null, + "Capability cannot be null"); Preconditions.checkArgument(resourceProfilesMap != null, "Resource profiles map cannot be null"); - Resource none = Resource.newInstance(0, 0); - Resource resource = Resource.newInstance(0, 0); + Resource resource; String profileName = capability.getProfileName(); + + if (resourceProfilesMap.isEmpty()) { + return capability.getProfileCapabilityOverride(); + } + if (profileName.isEmpty()) { profileName = DEFAULT_PROFILE; } + if (resourceProfilesMap.containsKey(profileName)) { resource = Resource.newInstance(resourceProfilesMap.get(profileName)); + } else { + resource = Resource.newInstance(0, 0); } - if (capability.getProfileCapabilityOverride() != null && - !capability.getProfileCapabilityOverride().equals(none)) { - for (ResourceInformation entry : capability - .getProfileCapabilityOverride().getResources()) { - if (entry != null && entry.getValue() >= 0) { - resource.setResourceInformation(entry.getName(), entry); - } - } + if ((capability.getProfileCapabilityOverride() != null) && + !NONE.equals(capability.getProfileCapabilityOverride())) { + Resource override = capability.getProfileCapabilityOverride(); + + resource.setMemorySize(override.getMemorySize()); + resource.setVirtualCores(override.getVirtualCores()); } + return resource; } + + @Override + public int compareTo(ProfileCapability other) { + int ret = this.getProfileName().compareTo(other.getProfileName()); + if (ret == 0) { + ret = this.getProfileCapabilityOverride() + .compareTo(other.getProfileCapabilityOverride()); + } + return ret; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 04579c53e93..103d54e225b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -19,13 +19,16 @@ package org.apache.hadoop.yarn.api.records; import java.util.Arrays; +import java.util.Set; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.impl.BaseResource; import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException; @@ -86,8 +89,32 @@ public static Resource newInstance(long memory, int vCores) { return new BaseResource(memory, vCores); } - @InterfaceAudience.Private - @InterfaceStability.Unstable + /** + * Create a new Resource object with the values specified for memory and + * vcores. Use the supplied default value for every other resource type. + * @param memory value for memory + * @param vCores value for vcores + * @param defaultValue default value for every other resource type + * @return + */ + @Public + @Unstable + public static Resource newInstance(long memory, int vCores, + long defaultValue) { + Resource ret = Records.newRecord(Resource.class); + ResourceInformation[] resources = ret.getResources(); + + for (int i = 2; i < resources.length; i++) { + ret.setResourceValue(i, defaultValue); + } + + ret.setMemorySize(memory); + ret.setVirtualCores(vCores); + return ret; + } + + @Private + @Unstable public static Resource newInstance(Resource resource) { Resource ret = Resource.newInstance(resource.getMemorySize(), resource.getVirtualCores()); diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index c1339b00b13..e0a15424f1b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -636,6 +636,9 @@ public boolean equals(Object obj) { return false; } } + if (!getProfileCapability().equals(other.getProfileCapability())) { + return false; + } return true; } @@ -655,8 +658,14 @@ public int compareTo(ResourceRequest other) { int numContainerComparison = this.getNumContainers() - other.getNumContainers(); if (numContainerComparison == 0) { - return Long.compare(getAllocationRequestId(), - other.getAllocationRequestId()); + int profileComparison = this.getProfileCapability() + .compareTo(other.getProfileCapability()); + if (profileComparison == 0) { + return Long.compare(getAllocationRequestId(), + other.getAllocationRequestId()); + } else { + return profileComparison; + } } else { return numContainerComparison; } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index c167862e58e..7dc9cb70fec 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.UpdatedContainer; @@ -1507,7 +1508,8 @@ private ProfileCapability createProfileCapability() } Resource resourceCapability = - Resource.newInstance(containerMemory, containerVirtualCores); + Resource.newInstance(containerMemory, containerVirtualCores, -1); + if (resourceProfiles == null) { containerMemory = containerMemory == -1 ? DEFAULT_CONTAINER_MEMORY : containerMemory; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 1a973049fc7..8d7e3516e80 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -1007,7 +1007,7 @@ private void setAMResourceCapability(ApplicationSubmissionContext appContext, appContext.getAMContainerResourceRequests().get(0).setProfileCapability( ProfileCapability.newInstance(tmp, Resource.newInstance(0, 0))); } - Resource capability = Resource.newInstance(0, 0); + Resource capability = Resource.newInstance(0, 0, -1); // set amMemory because it's used to set Xmx param if (profiles == null) { amMemory = memory == -1 ? DEFAULT_AM_MEMORY : memory; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index 2aae3a5c55e..6f8203d4994 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -646,12 +646,16 @@ public static void convertProfileToResourceCapability(ResourceRequest ask, ask.setCapability( ask.getProfileCapability().getProfileCapabilityOverride()); } - } else { - if (ask.getProfileCapability() != null) { - ask.setCapability(ProfileCapability - .toResource(ask.getProfileCapability(), - resourceProfilesManager.getResourceProfiles())); + } else if (ask.getProfileCapability() != null) { + if (resourceProfilesManager + .getProfile(ask.getProfileCapability().getProfileName()) == null) { + throw new YarnException( + "Unknown resource profile '" + ask.getProfileCapability() + .getProfileName() + "'."); } + ask.setCapability(ProfileCapability + .toResource(ask.getProfileCapability(), + resourceProfilesManager.getResourceProfiles())); } if (LOG_HANDLE.isDebugEnabled()) { LOG_HANDLE diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceProfiles.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceProfiles.java index a38f59bbe4d..b6b1b692537 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceProfiles.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resource/TestResourceProfiles.java @@ -18,14 +18,24 @@ package org.apache.hadoop.yarn.server.resourcemanager.resource; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ProfileCapability; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Assert; import org.junit.Test; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; import java.util.HashMap; import java.util.Map; @@ -33,6 +43,8 @@ * Common test class for resource profile related tests. */ public class TestResourceProfiles { + public final static String TEST_DIR = + new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); @Test public void testProfilesEnabled() throws Exception { @@ -61,12 +73,9 @@ public void testProfilesEnabled() throws Exception { @Test public void testLoadProfiles() throws Exception { - ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); - Configuration conf = new Configuration(); - conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); - conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE, - "profiles/sample-profiles-1.json"); - manager.init(conf); + ResourceProfilesManager manager = + createAndInitResourceProfilesManager("profiles/sample-profiles-1.json"); + Map profiles = manager.getResourceProfiles(); Map expected = new HashMap<>(); expected.put("minimum", Resource.newInstance(1024, 1)); @@ -104,12 +113,8 @@ public void testLoadProfilesMissingMandatoryProfile() throws Exception { @Test public void testGetProfile() throws Exception { - Configuration conf = new Configuration(); - conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); - ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); - conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE, - "profiles/sample-profiles-2.json"); - manager.init(conf); + ResourceProfilesManager manager = + createAndInitResourceProfilesManager("profiles/sample-profiles-2.json"); Map expected = new HashMap<>(); expected.put("minimum", Resource.newInstance(1024, 1)); expected.put("default", Resource.newInstance(2048, 2)); @@ -124,16 +129,19 @@ public void testGetProfile() throws Exception { Assert.assertEquals("Profile " + profile + "' resources don't match", res, manager.getProfile(profile)); } + + try { + manager.getProfile("random"); + Assert.fail("Requesting an unknown profile should throw an exception"); + } catch (YarnException ex) { + // Success + } } @Test public void testGetMandatoryProfiles() throws Exception { - ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); - Configuration conf = new Configuration(); - conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); - conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE, - "profiles/sample-profiles-1.json"); - manager.init(conf); + ResourceProfilesManager manager = + createAndInitResourceProfilesManager("profiles/sample-profiles-1.json"); Map expected = new HashMap<>(); expected.put("minimum", Resource.newInstance(1024, 1)); expected.put("default", Resource.newInstance(2048, 2)); @@ -147,4 +155,65 @@ public void testGetMandatoryProfiles() throws Exception { expected.get("maximum"), manager.getMaximumProfile()); } + + @Test + public void testConvertProfileToResourceCapability() throws Exception { + Configuration conf = + createConfiguration("profiles/sample-profiles-3.json"); + + conf.set(YarnConfiguration.RESOURCE_TYPES, "resource1"); + conf.set(YarnConfiguration.RESOURCE_TYPES + ".resource1.units", "G"); + + try { + ResourceUtils.resetResourceTypes(conf); + ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); + manager.init(conf); + Map expectedResultsMap = new HashMap<>(); + expectedResultsMap.put( + ProfileCapability.newInstance("small", Resource.newInstance(3072, 1)), + Resource.newInstance(3072, 1, 4)); + expectedResultsMap.put( + ProfileCapability.newInstance("small", Resource.newInstance(1024, 2)), + Resource.newInstance(1024, 2, 4)); + expectedResultsMap.put( + ProfileCapability.newInstance("small", Resource.newInstance(0, 0)), + Resource.newInstance(1024, 1, 4)); + + // override the extra resource, but it shouldn't apply + expectedResultsMap.put(ProfileCapability + .newInstance("small", Resource.newInstance(3072, 1, 2)), + Resource.newInstance(3072, 1, 4)); + + for (Map.Entry entry : expectedResultsMap + .entrySet()) { + ResourceRequest ask = ResourceRequest + .newInstance(Priority.UNDEFINED, "test", Resource.newInstance(0, 0), + 4, false, "*", ExecutionTypeRequest.newInstance(), + entry.getKey()); + RMServerUtils.convertProfileToResourceCapability(ask, conf, manager); + String message = + "Incorrect conversion, expected '" + entry.getValue() + "'; got '" + + ask.getCapability(); + Assert.assertEquals(message, entry.getValue(), ask.getCapability()); + } + } finally { + ResourceUtils.resetResourceTypes(null); + } + } + + private Configuration createConfiguration(String resourceProfilesFilename) { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true); + conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE, + resourceProfilesFilename); + return conf; + } + + private ResourceProfilesManager createAndInitResourceProfilesManager( + String resourceProfilesFilename) throws Exception { + ResourceProfilesManager manager = new ResourceProfilesManagerImpl(); + Configuration conf = createConfiguration(resourceProfilesFilename); + manager.init(conf); + return manager; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/profiles/sample-profiles-3.json hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/profiles/sample-profiles-3.json new file mode 100644 index 00000000000..8b94a70b3e5 --- /dev/null +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/profiles/sample-profiles-3.json @@ -0,0 +1,32 @@ +{ + "minimum": { + "memory-mb" : 1024, + "vcores" : 1, + "resource1" : 0 + }, + "default" : { + "memory-mb" : 2048, + "vcores" : 2, + "resource1" : 0 + }, + "maximum" : { + "memory-mb": 4096, + "vcores" : 4, + "resource1" : "4G" + }, + "small" : { + "memory-mb": 1024, + "vcores": 1, + "resource1" : "4G" + }, + "medium" : { + "memory-mb": 2048, + "vcores": 1, + "resource1" : 0 + }, + "large": { + "memory-mb" : 4096, + "vcores" : 4, + "resource1" : "2G" + } +}