diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 3faad480b9d..6bd36740872 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; @@ -123,6 +124,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; @@ -136,6 +138,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -664,12 +667,25 @@ public TaskAttemptImpl(TaskId taskId, int i, this.jobFile = jobFile; this.partition = partition; - //TODO:create the resource reqt for this Task attempt this.resourceCapability = recordFactory.newRecordInstance(Resource.class); this.resourceCapability.setMemorySize( getMemoryRequired(conf, taskId.getTaskType())); this.resourceCapability.setVirtualCores( getCpuRequired(conf, taskId.getTaskType())); + Optional customResourceTypePrefix = + getCustomResourceTypePrefix(taskId.getTaskType()); + if (customResourceTypePrefix.isPresent()) { + List resourceRequests = + ResourceUtils.getRequestedResourcesFromConfig(conf, + customResourceTypePrefix.get()); + for (ResourceInformation resourceRequest : resourceRequests) { + String resourceName = resourceRequest.getName(); + ResourceInformation resourceInformation = + this.resourceCapability.getResourceInformation(resourceName); + resourceInformation.setUnits(resourceRequest.getUnits()); + resourceInformation.setValue(resourceRequest.getValue()); + } + } this.dataLocalHosts = resolveHosts(dataLocalHosts); RackResolver.init(conf); @@ -705,6 +721,20 @@ private int getCpuRequired(Configuration conf, TaskType taskType) { return vcores; } + private Optional getCustomResourceTypePrefix(TaskType taskType) { + switch (taskType) { + case MAP: + return Optional.of(MRJobConfig.MAP_CUSTOM_RESOURCE_TYPE_PREFIX); + case REDUCE: + return Optional.of(MRJobConfig.REDUCE_CUSTOM_RESOURCE_TYPE_PREFIX); + default: + LOG.info("TaskType " + taskType + + " does not support custom resource types - this support can be " + + "added in " + getClass().getSimpleName()); + return Optional.empty(); + } + } + /** * Create a {@link LocalResource} record with all the given parameters. */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java index 7f187147c01..2a1aa2ba5d4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java @@ -71,6 +71,17 @@ public void initializeMemberVariables() { .add(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY); configurationPropsToSkipCompare .add(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY); + + // Resource type related properties are only prefixes, + // they need to be postfixed with the resource name + // in order to take effect. + // There is nothing to be added to mapred-default.xml + configurationPropsToSkipCompare.add( + MRJobConfig.MR_AM_CUSTOM_RESOURCE_PREFIX); + configurationPropsToSkipCompare.add( + MRJobConfig.MAP_CUSTOM_RESOURCE_TYPE_PREFIX); + configurationPropsToSkipCompare.add( + MRJobConfig.REDUCE_CUSTOM_RESOURCE_TYPE_PREFIX); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 61b780ef315..ff3e3b0eaf6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -28,13 +28,19 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import org.junit.After; import org.junit.Assert; +import org.junit.BeforeClass; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -42,6 +48,7 @@ import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; +import org.apache.hadoop.mapred.ReduceTaskAttemptImpl; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobCounter; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -82,24 +89,30 @@ import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.Test; import org.mockito.ArgumentCaptor; @SuppressWarnings({"unchecked", "rawtypes"}) public class TestTaskAttempt{ - + + private static final String CUSTOM_RESOURCE_NAME = "a-custom-resource"; + static public class StubbedFS extends RawLocalFileSystem { @Override public FileStatus getFileStatus(Path f) throws IOException { @@ -107,6 +120,40 @@ public FileStatus getFileStatus(Path f) throws IOException { } } + private static class CustomResourceTypesConfigurationProvider + extends LocalConfigurationProvider { + + @Override + public InputStream getConfigurationInputStream(Configuration bootstrapConf, + String name) throws YarnException, IOException { + if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) { + return new ByteArrayInputStream( + ("\n" + + " \n" + + " yarn.resource-types\n" + + " a-custom-resource\n" + + " \n" + + " \n" + + " yarn.resource-types.a-custom-resource.units\n" + + " G\n" + + " \n" + + "\n").getBytes()); + } else { + return super.getConfigurationInputStream(bootstrapConf, name); + } + } + } + + @BeforeClass + public static void setupBeforeClass() { + ResourceUtils.resetResourceTypes(new Configuration()); + } + + @After + public void tearDown() { + ResourceUtils.resetResourceTypes(new Configuration()); + } + @Test public void testMRAppHistoryForMap() throws Exception { MRApp app = new FailingAttemptsMRApp(1, 0); @@ -328,17 +375,18 @@ public void verifyMillisCounters(Resource containerResource, private TaskAttemptImpl createMapTaskAttemptImplForTest( EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) { Clock clock = SystemClock.getInstance(); - return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock); + return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, + clock, new JobConf()); } private TaskAttemptImpl createMapTaskAttemptImplForTest( - EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) { + EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, + Clock clock, JobConf jobConf) { ApplicationId appId = ApplicationId.newInstance(1, 1); JobId jobId = MRBuilderUtils.newJobId(appId, 1); TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); TaskAttemptListener taListener = mock(TaskAttemptListener.class); Path jobFile = mock(Path.class); - JobConf jobConf = new JobConf(); TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, taskSplitMetaInfo, jobConf, taListener, null, @@ -346,6 +394,20 @@ private TaskAttemptImpl createMapTaskAttemptImplForTest( return taImpl; } + private TaskAttemptImpl createReduceTaskAttemptImplForTest( + EventHandler eventHandler, Clock clock, JobConf jobConf) { + ApplicationId appId = ApplicationId.newInstance(1, 1); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.REDUCE); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + Path jobFile = mock(Path.class); + TaskAttemptImpl taImpl = + new ReduceTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1, + 1, jobConf, taListener, null, + null, clock, null); + return taImpl; + } + private void testMRAppHistory(MRApp app) throws Exception { Configuration conf = new Configuration(); Job job = app.submit(conf); @@ -1412,6 +1474,83 @@ public void testTimeoutWhileFailFinishing() throws Exception { assertFalse("InternalError occurred", eventHandler.internalError); } + @Test + public void testMapperCustomResourceTypes() { + initResourceTypes(); + EventHandler eventHandler = mock(EventHandler.class); + TaskSplitMetaInfo taskSplitMetaInfo = new TaskSplitMetaInfo(); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.setLong(MRJobConfig.MAP_CUSTOM_RESOURCE_TYPE_PREFIX + + CUSTOM_RESOURCE_NAME, 7L); + TaskAttemptImpl taImpl = createMapTaskAttemptImplForTest(eventHandler, + taskSplitMetaInfo, clock, jobConf); + ResourceInformation resourceInfo = + getResourceInfoFromContainerRequest(taImpl, eventHandler); + assertEquals("Expecting the default unit (G)", + "G", resourceInfo.getUnits()); + assertEquals(7L, resourceInfo.getValue()); + } + + @Test + public void testReducerCustomResourceTypes() { + initResourceTypes(); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_CUSTOM_RESOURCE_TYPE_PREFIX + + CUSTOM_RESOURCE_NAME, "3m"); + TaskAttemptImpl taImpl = + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + ResourceInformation resourceInfo = + getResourceInfoFromContainerRequest(taImpl, eventHandler); + assertEquals("Expecting the specified unit (m)", + "m", resourceInfo.getUnits()); + assertEquals(3L, resourceInfo.getValue()); + } + + private ResourceInformation getResourceInfoFromContainerRequest( + TaskAttemptImpl taImpl, EventHandler eventHandler) { + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_SCHEDULE)); + + assertEquals("Task attempt is not in STARTING state", taImpl.getState(), + TaskAttemptState.STARTING); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Event.class); + verify(eventHandler, times(2)).handle(captor.capture()); + + List containerRequestEvents = new ArrayList<>(); + for (Event e : captor.getAllValues()) { + if (e instanceof ContainerRequestEvent) { + containerRequestEvents.add((ContainerRequestEvent) e); + } + } + assertEquals("Expected one ContainerRequestEvent after scheduling " + + "task attempt", 1, containerRequestEvents.size()); + + Resource capability = containerRequestEvents.get(0).getCapability(); + return capability.getResourceInformation(CUSTOM_RESOURCE_NAME); + } + + @Test(expected=IllegalArgumentException.class) + public void testReducerCustomResourceTypeWithInvalidUnit() { + initResourceTypes(); + EventHandler eventHandler = mock(EventHandler.class); + Clock clock = SystemClock.getInstance(); + JobConf jobConf = new JobConf(); + jobConf.set(MRJobConfig.REDUCE_CUSTOM_RESOURCE_TYPE_PREFIX + + CUSTOM_RESOURCE_NAME, "3z"); + createReduceTaskAttemptImplForTest(eventHandler, clock, jobConf); + } + + private void initResourceTypes() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + CustomResourceTypesConfigurationProvider.class.getName()); + ResourceUtils.resetResourceTypes(conf); + } + private void setupTaskAttemptFinishingMonitor( EventHandler eventHandler, JobConf jobConf, AppContext appCtx) { TaskAttemptFinishingMonitor taskAttemptFinishingMonitor = diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index cf597301c63..4c2cd02c131 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -298,6 +298,14 @@ public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores"; public static final int DEFAULT_MAP_CPU_VCORES = 1; + /** + * Custom resource names required by the mapper should be + * appended to this prefix, the value's format is {amount}[{unit}]. + * If no unit is defined, the default unit will be used + */ + public static final String MAP_CUSTOM_RESOURCE_TYPE_PREFIX = + "mapreduce.map.resource."; + public static final String MAP_ENV = "mapreduce.map.env"; public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts"; @@ -352,6 +360,14 @@ public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores"; public static final int DEFAULT_REDUCE_CPU_VCORES = 1; + /** + * Custom resource names required by the reducer should be + * appended to this prefix, the value's format is {amount}[{unit}]. + * If no unit is defined, the default unit will be used + */ + public static final String REDUCE_CUSTOM_RESOURCE_TYPE_PREFIX = + "mapreduce.reduce.resource."; + public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes"; public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent"; @@ -547,6 +563,14 @@ MR_AM_PREFIX+"resource.cpu-vcores"; public static final int DEFAULT_MR_AM_CPU_VCORES = 1; + /** + * Custom resource names required by the MR AM should be + * appended to this prefix, the value's format is {amount}[{unit}]. + * If no unit is defined, the default unit will be used + */ + public static final String MR_AM_CUSTOM_RESOURCE_PREFIX = + MR_AM_PREFIX+"resource."; + /** Command line arguments passed to the MR app master.*/ public static final String MR_AM_COMMAND_OPTS = MR_AM_PREFIX+"command-opts"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 1baa467cdeb..a782d43033e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -18,6 +18,8 @@ package org.apache.hadoop.mapred; +import static org.apache.hadoop.mapreduce.MRJobConfig.MR_AM_CUSTOM_RESOURCE_PREFIX; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -82,6 +84,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -91,6 +94,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.client.RMDelegationTokenSelector; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import com.google.common.annotations.VisibleForTesting; @@ -631,6 +635,21 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES ) ); + List resourceRequests = ResourceUtils + .getRequestedResourcesFromConfig(conf, MR_AM_CUSTOM_RESOURCE_PREFIX); + for (ResourceInformation resourceReq : resourceRequests) { + String resourceName = resourceReq.getName(); + // filter out clashing config keys: MR_AM_VMEM_MB and MR_AM_CPU_VCORES + if (MRJobConfig.MR_AM_VMEM_MB.equals(MR_AM_CUSTOM_RESOURCE_PREFIX + + resourceName) || MRJobConfig.MR_AM_CPU_VCORES.equals( + MR_AM_CUSTOM_RESOURCE_PREFIX + resourceName)) { + continue; + } + ResourceInformation resourceInformation = capability + .getResourceInformation(resourceName); + resourceInformation.setUnits(resourceReq.getUnits()); + resourceInformation.setValue(resourceReq.getValue()); + } if (LOG.isDebugEnabled()) { LOG.debug("AppMaster capability = " + capability); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java index 55ddea6da38..fbe8b8bd7f2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java @@ -33,10 +33,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -69,6 +71,7 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; @@ -96,16 +99,19 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.log4j.Appender; import org.apache.log4j.Layout; import org.apache.log4j.Logger; @@ -114,6 +120,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -131,6 +138,30 @@ MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.substring(0, MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS.lastIndexOf("%")); + private static class CustomResourceTypesConfigurationProvider + extends LocalConfigurationProvider { + + @Override + public InputStream getConfigurationInputStream(Configuration bootstrapConf, + String name) throws YarnException, IOException { + if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) { + return new ByteArrayInputStream( + ("\n" + + " \n" + + " yarn.resource-types\n" + + " a-custom-resource\n" + + " \n" + + " \n" + + " yarn.resource-types.a-custom-resource.units\n" + + " G\n" + + " \n" + + "\n").getBytes()); + } else { + return super.getConfigurationInputStream(bootstrapConf, name); + } + } + } + private YARNRunner yarnRunner; private ResourceMgrDelegate resourceMgrDelegate; private YarnConfiguration conf; @@ -143,6 +174,11 @@ private ClientServiceDelegate clientDelegate; private static final String failString = "Rejected job"; + @BeforeClass + public static void setupBeforeClass() { + ResourceUtils.resetResourceTypes(new Configuration()); + } + @Before public void setUp() throws Exception { resourceMgrDelegate = mock(ResourceMgrDelegate.class); @@ -175,6 +211,7 @@ public ApplicationSubmissionContext answer(InvocationOnMock invocation) @After public void cleanup() { FileUtil.fullyDelete(testWorkDir); + ResourceUtils.resetResourceTypes(new Configuration()); } @Test(timeout=20000) @@ -881,4 +918,40 @@ public void testSendJobConf() throws IOException { .get("hadoop.tmp.dir").equals("testconfdir")); UserGroupInformation.reset(); } + + @Test + public void testCustomAMResourceType() throws Exception { + initResourceTypes(); + String customResourceName = "a-custom-resource"; + + JobConf jobConf = new JobConf(); + + jobConf.setInt(MRJobConfig.MR_AM_CUSTOM_RESOURCE_PREFIX + + customResourceName, 5); + jobConf.setInt(MRJobConfig.MR_AM_CPU_VCORES, 3); + + yarnRunner = new YARNRunner(jobConf); + + submissionContext = buildSubmitContext(yarnRunner, jobConf); + + List resourceRequests = + submissionContext.getAMContainerResourceRequests(); + + Assert.assertEquals(1, resourceRequests.size()); + ResourceRequest resourceRequest = resourceRequests.get(0); + + ResourceInformation resourceInformation = resourceRequest.getCapability() + .getResourceInformation(customResourceName); + Assert.assertEquals("Expecting the default unit (G)", + "G", resourceInformation.getUnits()); + Assert.assertEquals(5L, resourceInformation.getValue()); + Assert.assertEquals(3, resourceRequest.getCapability().getVirtualCores()); + } + + private void initResourceTypes() { + Configuration configuration = new Configuration(); + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + CustomResourceTypesConfigurationProvider.class.getName()); + ResourceUtils.resetResourceTypes(configuration); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java index 0564d749bbc..669d1b30c32 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceUtils.java @@ -22,8 +22,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; import org.apache.hadoop.yarn.api.records.Resource; @@ -44,7 +42,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Helper class to read the resource-types to be supported by the system. @@ -58,6 +59,8 @@ private static final String MEMORY = ResourceInformation.MEMORY_MB.getName(); private static final String VCORES = ResourceInformation.VCORES.getName(); + private static final Pattern RESOURCE_REQUEST_VALUE_PATTERN = + Pattern.compile("^([0-9]+)([a-zA-Z]*)$"); private static volatile boolean initializedResources = false; private static final Map RESOURCE_NAME_TO_INDEX = @@ -573,4 +576,43 @@ public static String getDefaultUnit(String resourceType) { } return array; } + + /** + * From a given configuration get all entries representing requested + * resources: entries that match the {prefix}{resourceName}={value}[{units}] + * pattern. + * @param configuration The configuration + * @param prefix Keys with this prefix are considered from the configuration + * @return The list of requested resources as described by the configuration + */ + public static List getRequestedResourcesFromConfig( + Configuration configuration, String prefix) { + List result = new ArrayList<>(); + Map customResourcesMap = configuration + .getValByRegex("^" + Pattern.quote(prefix) + "[^.]+$"); + for (Entry resource : customResourcesMap.entrySet()) { + String resourceName = resource.getKey().substring(prefix.length()); + Matcher matcher = + RESOURCE_REQUEST_VALUE_PATTERN.matcher(resource.getValue()); + if (!matcher.matches()) { + String errorMsg = "Invalid resource request specified for property " + + resource.getKey() + ": " + resource.getValue() + + ", expected format is: "; + LOG.error(errorMsg); + throw new IllegalArgumentException(errorMsg); + } + long value = Long.parseLong(matcher.group(1)); + String unit = matcher.group(2); + if (unit.isEmpty()) { + unit = ResourceUtils.getDefaultUnit(resourceName); + } + ResourceInformation resourceInformation = new ResourceInformation(); + resourceInformation.setName(resourceName); + resourceInformation.setValue(value); + resourceInformation.setUnits(unit); + result.add(resourceInformation); + } + return result; + } + }