diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java index 79f9f3a..33daf28 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/RegisterApplicationMasterResponse.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; import java.nio.ByteBuffer; +import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.util.Records; /** @@ -180,4 +182,25 @@ public abstract void setContainersFromPreviousAttempts( @Private @Unstable public abstract void setNMTokensFromPreviousAttempts(List nmTokens); + + /** + * Get a set of the resource types considered by the scheduler. + * + * @return a Map of RM settings + */ + @Public + @Unstable + public abstract EnumSet getSchedulerResourceTypes(); + + /** + * Set the resource types used by the scheduler. + * + * @param types + * a set of the resource types that the scheduler considers during + * scheduling + */ + @Private + @Unstable + public abstract void setSchedulerResourceTypes( + EnumSet types); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index df8784b..4203744 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -47,6 +47,7 @@ message RegisterApplicationMasterResponseProto { repeated ContainerProto containers_from_previous_attempts = 4; optional string queue = 5; repeated NMTokenProto nm_tokens_from_previous_attempts = 6; + repeated SchedulerResourceTypes scheduler_resource_types = 7; } message FinishApplicationMasterRequestProto { @@ -88,6 +89,11 @@ message AllocateResponseProto { optional hadoop.common.TokenProto am_rm_token = 12; } +enum SchedulerResourceTypes { + MEMORY = 0; + CPU = 1; +} + ////////////////////////////////////////////////////// /////// client_RM_Protocol /////////////////////////// ////////////////////////////////////////////////////// diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java index 06a637a..32dc85d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/RegisterApplicationMasterResponsePBImpl.java @@ -20,11 +20,7 @@ import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -43,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import com.google.protobuf.ByteString; import com.google.protobuf.TextFormat; @@ -61,6 +58,7 @@ private Map applicationACLS = null; private List containersFromPreviousAttempts = null; private List nmTokens = null; + private EnumSet schedulerResourceTypes = null; public RegisterApplicationMasterResponsePBImpl() { builder = RegisterApplicationMasterResponseProto.newBuilder(); @@ -122,6 +120,9 @@ private void mergeLocalToBuilder() { Iterable iterable = getTokenProtoIterable(nmTokens); builder.addAllNmTokensFromPreviousAttempts(iterable); } + if(schedulerResourceTypes != null) { + addSchedulerResourceTypes(); + } } @@ -364,6 +365,73 @@ public void remove() { }; } + @Override + public EnumSet getSchedulerResourceTypes() { + initSchedulerResourceTypes(); + return this.schedulerResourceTypes; + } + + private void initSchedulerResourceTypes() { + if (this.schedulerResourceTypes != null) { + return; + } + RegisterApplicationMasterResponseProtoOrBuilder p = + viaProto ? proto : builder; + + List list = p.getSchedulerResourceTypesList(); + if (list.isEmpty()) { + this.schedulerResourceTypes = + EnumSet.noneOf(SchedulerResourceTypes.class); + } else { + this.schedulerResourceTypes = EnumSet.copyOf(list); + } + } + + private void addSchedulerResourceTypes() { + maybeInitBuilder(); + builder.clearSchedulerResourceTypes(); + if (schedulerResourceTypes == null) { + return; + } + Iterable values = + new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + Iterator settingsIterator = + schedulerResourceTypes.iterator(); + + @Override + public boolean hasNext() { + return settingsIterator.hasNext(); + } + + @Override + public SchedulerResourceTypes next() { + return settingsIterator.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + this.builder.addAllSchedulerResourceTypes(values); + } + + @Override + public void setSchedulerResourceTypes(EnumSet types) { + if (types == null) { + return; + } + initSchedulerResourceTypes(); + this.schedulerResourceTypes.clear(); + this.schedulerResourceTypes.addAll(types); + } + private Resource convertFromProtoFormat(ResourceProto resource) { return new ResourcePBImpl(resource); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index d77180c..e6d878a 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -22,11 +22,7 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -329,6 +325,10 @@ public RegisterApplicationMasterResponse registerApplicationMaster( + transferredContainers.size() + " containers from previous" + " attempts and " + nmTokens.size() + " NM tokens."); } + + response.setSchedulerResourceTypes(rScheduler + .getSchedulingResourceTypes()); + return response; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index ee5dcbe..0b5447b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -19,13 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -45,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; @@ -502,4 +497,10 @@ public synchronized void updateNodeResource(RMNode nm, + " with the same resource: " + newResource); } } + + /** {@inheritDoc} */ + @Override + public EnumSet getSchedulingResourceTypes() { + return EnumSet.of(SchedulerResourceTypes.MEMORY); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java index 5ce16c2..b6c1018 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.io.IOException; +import java.util.EnumSet; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; /** * This interface is used by the components to talk to the @@ -220,4 +222,12 @@ public String moveApplication(ApplicationId appId, String newQueue) * @throws YarnException */ void killAllAppsInQueue(String queueName) throws YarnException; + + /** + * Return a collection of the resource types that are considered when + * scheduling + * + * @return an EnumSet containing the resource types + */ + public EnumSet getSchedulingResourceTypes(); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index a8ef942..6b810d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -20,13 +20,7 @@ import java.io.IOException; import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Random; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -50,12 +44,12 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*; @@ -89,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -1285,4 +1280,15 @@ private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException { } return (LeafQueue) ret; } + + /** {@inheritDoc} */ + @Override + public EnumSet getSchedulingResourceTypes() { + if (calculator.getClass().getName() + .equals(DefaultResourceCalculator.class.getName())) { + return EnumSet.of(SchedulerResourceTypes.MEMORY); + } + return EnumSet + .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a35e49f..9c40d48 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -19,13 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -50,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; @@ -1529,4 +1524,11 @@ public synchronized void updateNodeResource(RMNode nm, queueMgr.getRootQueue().setSteadyFairShare(clusterResource); queueMgr.getRootQueue().recomputeSteadyShares(); } + + /** {@inheritDoc} */ + @Override + public EnumSet getSchedulingResourceTypes() { + return EnumSet + .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index b0ffc85..3508a3c 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -18,6 +18,13 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -40,8 +47,7 @@ import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import static java.lang.Thread.sleep; import static org.mockito.Matchers.any; @@ -259,4 +265,47 @@ public void testFinishApplicationMasterBeforeRegistering() throws Exception { } } } + + @Test(timeout = 3000000) + public void testResourceTypes() throws Exception { + HashMap> driver = + new HashMap>(); + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setResourceComparator(DominantResourceCalculator.class); + YarnConfiguration testCapacityDRConf = new YarnConfiguration(csconf); + testCapacityDRConf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, ResourceScheduler.class); + YarnConfiguration testCapacityDefConf = new YarnConfiguration(); + testCapacityDefConf.setClass(YarnConfiguration.RM_SCHEDULER, + CapacityScheduler.class, ResourceScheduler.class); + YarnConfiguration testFairDefConf = new YarnConfiguration(); + testFairDefConf.setClass(YarnConfiguration.RM_SCHEDULER, + FairScheduler.class, ResourceScheduler.class); + + driver.put(conf, EnumSet.of(SchedulerResourceTypes.MEMORY)); + driver.put(testCapacityDRConf, + EnumSet.of(SchedulerResourceTypes.CPU, SchedulerResourceTypes.MEMORY)); + driver.put(testCapacityDefConf, EnumSet.of(SchedulerResourceTypes.MEMORY)); + driver.put(testFairDefConf, + EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU)); + + for (Map.Entry> entry : driver + .entrySet()) { + EnumSet expectedValue = entry.getValue(); + MockRM rm = new MockRM(entry.getKey()); + rm.start(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + RMApp app1 = rm.submitApp(2048); + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + RegisterApplicationMasterResponse resp = am1.registerAppAttempt(); + EnumSet types = resp.getSchedulerResourceTypes(); + LOG.info("types = " + types.toString()); + Assert.assertEquals(expectedValue, types); + rm.stop(); + } + } }