diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 70a6496..4705f45 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -27,7 +27,9 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -79,6 +81,10 @@ private final Map schedulingPolicies; private final SchedulingPolicy defaultSchedulingPolicy; + + // Which types of resources are used to calculate DRF + private final Set drfPolicyResourcesDefault; + private final Map> drfPolicyResourcesMap; // Policy for mapping apps to queues @VisibleForTesting @@ -96,6 +102,8 @@ public AllocationConfiguration(Map minQueueResources, int queueMaxAppsDefault, float queueMaxAMShareDefault, Map schedulingPolicies, SchedulingPolicy defaultSchedulingPolicy, + Map> drfPolicyResourcesMap, + Set drfPolicyResourcesDefault, Map minSharePreemptionTimeouts, Map fairSharePreemptionTimeouts, Map fairSharePreemptionThresholds, @@ -112,6 +120,8 @@ public AllocationConfiguration(Map minQueueResources, this.queueMaxAppsDefault = queueMaxAppsDefault; this.queueMaxAMShareDefault = queueMaxAMShareDefault; this.defaultSchedulingPolicy = defaultSchedulingPolicy; + this.drfPolicyResourcesMap = drfPolicyResourcesMap; + this.drfPolicyResourcesDefault = drfPolicyResourcesDefault; this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; @@ -143,6 +153,10 @@ public AllocationConfiguration(Configuration conf) { } placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); + drfPolicyResourcesDefault = new HashSet(); + drfPolicyResourcesDefault.add(ResourceType.CPU); + drfPolicyResourcesDefault.add(ResourceType.MEMORY); + drfPolicyResourcesMap = new HashMap>(); } /** @@ -245,10 +259,20 @@ public boolean hasAccess(String queueName, QueueACL acl, return false; } - - public SchedulingPolicy getSchedulingPolicy(String queueName) { - SchedulingPolicy policy = schedulingPolicies.get(queueName); - return (policy == null) ? defaultSchedulingPolicy : policy; + + public SchedulingPolicy getSchedulingPolicy(String queueName) + throws AllocationConfigurationException { + SchedulingPolicy policy = schedulingPolicies.containsKey(queueName) ? + schedulingPolicies.get(queueName) : defaultSchedulingPolicy; + if (policy instanceof DominantResourceFairnessPolicy) { + Set enabledResources = getDRFPolicyEnabledResources(queueName); + if (enabledResources == null || enabledResources.isEmpty()) { + enabledResources = drfPolicyResourcesDefault; + } + ((DominantResourceFairnessPolicy) policy) + .setEnabledResourceTypes(enabledResources); + } + return policy; } public SchedulingPolicy getDefaultSchedulingPolicy() { @@ -262,4 +286,9 @@ public SchedulingPolicy getDefaultSchedulingPolicy() { public QueuePlacementPolicy getPlacementPolicy() { return placementPolicy; } + + private Set getDRFPolicyEnabledResources(String queueName) { + return drfPolicyResourcesMap.containsKey(queueName) ? + drfPolicyResourcesMap.get(queueName) : drfPolicyResourcesDefault; + } } \ No newline at end of file diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index e0e23e0..867ae3d 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -40,6 +40,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -216,6 +217,8 @@ public synchronized void reloadAllocations() throws IOException, Map queueMaxAMShares = new HashMap(); Map queueWeights = new HashMap(); Map queuePolicies = new HashMap(); + Map> drfPolicyResourcesMap = + new HashMap>(); Map minSharePreemptionTimeouts = new HashMap(); Map fairSharePreemptionTimeouts = new HashMap(); Map fairSharePreemptionThresholds = @@ -229,6 +232,9 @@ public synchronized void reloadAllocations() throws IOException, long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; float defaultFairSharePreemptionThreshold = 0.5f; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; + Set drfPolicyResourcesDefault = new HashSet(); + drfPolicyResourcesDefault.add(ResourceType.CPU); + drfPolicyResourcesDefault.add(ResourceType.MEMORY); QueuePlacementPolicy newPlacementPolicy = null; @@ -317,6 +323,13 @@ public synchronized void reloadAllocations() throws IOException, defaultSchedPolicy = SchedulingPolicy.parse(text); } else if ("queuePlacementPolicy".equals(element.getTagName())) { placementPolicyElement = element; + } else if ("DRFPolicyResourcesDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + drfPolicyResourcesDefault = getResourceTypes(text); + if (drfPolicyResourcesDefault.isEmpty()) { + throw new AllocationConfigurationException("The default DRF policy " + + "resources can not be null."); + } } else { LOG.warn("Bad element in allocations file: " + element.getTagName()); } @@ -336,8 +349,9 @@ public synchronized void reloadAllocations() throws IOException, } loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, - fairSharePreemptionThresholds, queueAcls, configuredQueues); + queuePolicies, drfPolicyResourcesMap, minSharePreemptionTimeouts, + fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, + configuredQueues); } // Load placement policy and pass it configured queues @@ -370,6 +384,7 @@ public synchronized void reloadAllocations() throws IOException, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, + drfPolicyResourcesMap, drfPolicyResourcesDefault, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, newPlacementPolicy, configuredQueues); @@ -389,6 +404,7 @@ private void loadQueue(String parentName, Element element, Map userMaxApps, Map queueMaxAMShares, Map queueWeights, Map queuePolicies, + Map> drfResourcesMap, Map minSharePreemptionTimeouts, Map fairSharePreemptionTimeouts, Map fairSharePreemptionThresholds, @@ -464,11 +480,17 @@ private void loadQueue(String parentName, Element element, "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, - queuePolicies, minSharePreemptionTimeouts, + queuePolicies, drfResourcesMap, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; + } else if ("DRFPolicyResources".equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData(); + Set < ResourceType > drfResources = getResourceTypes(text); + if (drfResources != null && !drfResources.isEmpty()) { + drfResourcesMap.put(queueName, drfResources); + } } } if (isLeaf) { @@ -496,4 +518,16 @@ private void loadQueue(String parentName, Element element, public interface Listener { public void onReload(AllocationConfiguration info); } + + protected Set getResourceTypes(String value) { + Set resourceTypes = new HashSet(); + String[] types = value.split(","); + for (String type : types) { + try { + ResourceType rType = ResourceType.valueOf(type.trim()); + resourceTypes.add(rType); + } catch (Exception e) {} + } + return resourceTypes; + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a687e71..fa2d04b 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -231,6 +231,24 @@ private void validateConf(Configuration conf) { + "=" + maxVcores + ", min should equal greater than 0" + ", max should be no smaller than min."); } + + // validate scheduler vdisks allocation setting + int minVdisks = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS); + int maxVdisks = conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS); + + if (minVdisks < 0 || minVdisks > maxVdisks) { + throw new YarnRuntimeException("Invalid resource scheduler vdisks" + + " allocation configuration" + + ", " + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS + + "=" + minVdisks + + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS + + "=" + maxVdisks + ", min should equal greater than 0" + + ", max should be no smaller than min."); + } } public FairSchedulerConfiguration getConf() { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 32ef906..b537399 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -48,7 +48,10 @@ public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores"; public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1; - + public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VDISKS = + YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-disk-vdisks"; + public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_DISK_VDISKS = 1; + private static final String CONF_PREFIX = "yarn.scheduler.fair."; public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file"; @@ -143,7 +146,10 @@ public Resource getMinimumAllocation() { int cpu = getInt( YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES); - return Resources.createResource(mem, cpu); + int vdisks = getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS); + return Resources.createResource(mem, cpu, vdisks); } public Resource getMaximumAllocation() { @@ -153,7 +159,10 @@ public Resource getMaximumAllocation() { int cpu = getInt( YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); - return Resources.createResource(mem, cpu); + int vdisks = getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS); + return Resources.createResource(mem, cpu, vdisks); } public Resource getIncrementAllocation() { @@ -163,7 +172,11 @@ public Resource getIncrementAllocation() { int incrementCores = getInt( RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES); - return Resources.createResource(incrementMemory, incrementCores); + int incrementVdisks = getInt( + RM_SCHEDULER_INCREMENT_ALLOCATION_VDISKS, + DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_DISK_VDISKS); + return Resources.createResource( + incrementMemory, incrementCores, incrementVdisks); } public float getLocalityThresholdNode() { @@ -234,7 +247,8 @@ public boolean getUsePortForNodeName() { /** * Parses a resource config value of a form like "1024", "1024 mb", - * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed. + * "1024 mb, 3 vcores", or "1024mb, 3 vcores, 1 vdisks". If no units are + * given, megabytes are assumed. * * @throws AllocationConfigurationException */ @@ -244,7 +258,8 @@ public static Resource parseResourceConfigValue(String val) val = val.toLowerCase(); int memory = findResource(val, "mb"); int vcores = findResource(val, "vcores"); - return BuilderUtils.newResource(memory, vcores); + int vdisks = findResource(val, "vdisks"); + return Resource.newInstance(memory, vcores, vdisks); } catch (AllocationConfigurationException ex) { throw ex; } catch (Exception ex) { @@ -261,9 +276,11 @@ private static int findResource(String val, String units) throws AllocationConfigurationException { Pattern pattern = Pattern.compile("(\\d+)\\s*" + units); Matcher matcher = pattern.matcher(val); - if (!matcher.find()) { + + if (!matcher.find() && !units.equals("vdisks")) { throw new AllocationConfigurationException("Missing resource: " + units); + } else { + return matcher.find(0) ? Integer.parseInt(matcher.group(1)) : 0; } - return Integer.parseInt(matcher.group(1)); } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 30826b0..f7125d7 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Comparator; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -47,6 +48,8 @@ private DominantResourceFairnessComparator comparator = new DominantResourceFairnessComparator(); + private Set enabledResourceTypes; + @Override public String getName() { return NAME; @@ -113,9 +116,20 @@ public void initialize(Resource clusterCapacity) { comparator.setClusterCapacity(clusterCapacity); } - public static class DominantResourceFairnessComparator implements Comparator { - private static final int NUM_RESOURCES = ResourceType.values().length; - + /** + * Set the resources (cpu/memory/diskio) to be considered in DRF. + * + * @param enabledResourceTypes the resources to be considered in DRF. + */ + public void setEnabledResourceTypes(Set enabledResourceTypes) { + this.enabledResourceTypes = enabledResourceTypes; + } + + public Set getEnabledResourceTypes() { + return this.enabledResourceTypes; + } + + public class DominantResourceFairnessComparator implements Comparator { private Resource clusterCapacity; public void setClusterCapacity(Resource clusterCapacity) { @@ -124,13 +138,16 @@ public void setClusterCapacity(Resource clusterCapacity) { @Override public int compare(Schedulable s1, Schedulable s2) { + if (enabledResourceTypes == null || enabledResourceTypes.isEmpty()) { + return (int)(s1.getStartTime() - s2.getStartTime()); + } + ResourceWeights sharesOfCluster1 = new ResourceWeights(); ResourceWeights sharesOfCluster2 = new ResourceWeights(); ResourceWeights sharesOfMinShare1 = new ResourceWeights(); ResourceWeights sharesOfMinShare2 = new ResourceWeights(); - ResourceType[] resourceOrder1 = new ResourceType[NUM_RESOURCES]; - ResourceType[] resourceOrder2 = new ResourceType[NUM_RESOURCES]; - + ResourceType[] resourceOrder1 = new ResourceType[enabledResourceTypes.size()]; + ResourceType[] resourceOrder2 = new ResourceType[enabledResourceTypes.size()]; // Calculate shares of the cluster for each resource both schedulables. calculateShares(s1.getResourceUsage(), clusterCapacity, sharesOfCluster1, resourceOrder1, s1.getWeights()); @@ -146,7 +163,6 @@ public int compare(Schedulable s1, Schedulable s2) { // for that resource boolean s1Needy = sharesOfMinShare1.getWeight(resourceOrder1[0]) < 1.0f; boolean s2Needy = sharesOfMinShare2.getWeight(resourceOrder2[0]) < 1.0f; - int res = 0; if (!s2Needy && !s1Needy) { res = compareShares(sharesOfCluster1, sharesOfCluster2, @@ -172,45 +188,57 @@ public int compare(Schedulable s1, Schedulable s2) { * it takes up. The resourceOrder vector contains an ordering of resources * by largest share. So if resource=<10 MB, 5 CPU>, and pool=<100 MB, 10 CPU>, * shares will be [.1, .5] and resourceOrder will be [CPU, MEMORY]. + * // FIXME: update the example */ void calculateShares(Resource resource, Resource pool, ResourceWeights shares, ResourceType[] resourceOrder, ResourceWeights weights) { - shares.setWeight(MEMORY, (float)resource.getMemory() / - (pool.getMemory() * weights.getWeight(MEMORY))); - shares.setWeight(CPU, (float)resource.getVirtualCores() / - (pool.getVirtualCores() * weights.getWeight(CPU))); - shares.setWeight(DISKIO, (float)resource.getVirtualDisks() / - (pool.getVirtualDisks() * weights.getWeight(DISKIO))); - // sort order vector by resource share - if (resourceOrder != null) { - if (shares.getWeight(MEMORY) > shares.getWeight(CPU)) { - if (shares.getWeight(MEMORY) > shares.getWeight(DISKIO)) { - resourceOrder[0] = MEMORY; - if (shares.getWeight(CPU) > shares.getWeight(DISKIO)) { - resourceOrder[1] = CPU; - resourceOrder[2] = DISKIO; - } + if (enabledResourceTypes.contains(ResourceType.MEMORY)) { + shares.setWeight(MEMORY, (float)resource.getMemory() / + (pool.getMemory() * weights.getWeight(MEMORY))); + } + if (enabledResourceTypes.contains(ResourceType.CPU)) { + shares.setWeight(CPU, (float) resource.getVirtualCores() / + (pool.getVirtualCores() * weights.getWeight(CPU))); + } + if (enabledResourceTypes.contains(ResourceType.DISKIO)) { + shares.setWeight(DISKIO, (float) resource.getVirtualDisks() / + (pool.getVirtualDisks() * weights.getWeight(DISKIO))); + } + if (resourceOrder == null) { + return; + } + + int position = 0; + if (enabledResourceTypes.contains(ResourceType.MEMORY)) { + resourceOrder[0] = MEMORY; + position ++; + } + if (enabledResourceTypes.contains(ResourceType.CPU)) { + if (position == 0) { + resourceOrder[0] = CPU; + } else { + if (shares.getWeight(MEMORY) >= shares.getWeight(CPU)) { + resourceOrder[1] = CPU; } else { - resourceOrder[0] = DISKIO; + resourceOrder[0] = CPU; resourceOrder[1] = MEMORY; - resourceOrder[2] = CPU; } - } else { - if (shares.getWeight(CPU) > shares.getWeight(DISKIO)) { - resourceOrder[0] = CPU; - if (shares.getWeight(MEMORY) > shares.getWeight(DISKIO)) { - resourceOrder[1] = MEMORY; - resourceOrder[2] = DISKIO; - } else { - resourceOrder[1] = DISKIO; - resourceOrder[2] = MEMORY; - } - } else { - resourceOrder[0] = DISKIO; - resourceOrder[1] = CPU; - resourceOrder[2] = MEMORY; + } + position ++; + } + if (enabledResourceTypes.contains(ResourceType.DISKIO)) { + int startIndex = 0; + while (startIndex < position) { + if (shares.getWeight(DISKIO) >= + shares.getWeight(resourceOrder[startIndex])) { + break; } + startIndex ++; + } + for (int i = position; i > startIndex; i --) { + resourceOrder[i] = resourceOrder[i-1]; } + resourceOrder[startIndex] = DISKIO; } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index c6bab00..b002359 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; public class FairSchedulerTestBase { @@ -135,6 +134,11 @@ protected ApplicationAttemptId createSchedulingRequest( } protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, int vdisks, String queueId, String userId) { + return createSchedulingRequest(memory, vcores, vdisks, queueId, userId, 1); + } + + protected ApplicationAttemptId createSchedulingRequest( int memory, String queueId, String userId, int numContainers) { return createSchedulingRequest(memory, queueId, userId, numContainers, 1); } @@ -145,6 +149,13 @@ protected ApplicationAttemptId createSchedulingRequest( } protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, int vdisks, String queueId, String userId, + int numContainers) { + return createSchedulingRequest(memory, vcores, vdisks, queueId, userId, + numContainers, 1); + } + + protected ApplicationAttemptId createSchedulingRequest( int memory, String queueId, String userId, int numContainers, int priority) { return createSchedulingRequest(memory, 1, queueId, userId, numContainers, priority); @@ -218,6 +229,13 @@ protected void createSchedulingRequestExistingApplication( } protected void createSchedulingRequestExistingApplication( + int memory, int vcores, int vdisks, int priority, ApplicationAttemptId attId) { + ResourceRequest request = createResourceRequest(memory, vcores, vdisks, + ResourceRequest.ANY, priority, 1, true); + createSchedulingRequestExistingApplication(request, attId); + } + + protected void createSchedulingRequestExistingApplication( ResourceRequest request, ApplicationAttemptId attId) { List ask = new ArrayList(); ask.add(request); @@ -239,4 +257,4 @@ protected void createApplicationWithAMResource(ApplicationAttemptId attId, new AppAttemptAddedSchedulerEvent(attId, false); scheduler.handle(attempAddedEvent); } -} \ No newline at end of file +} diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 751d9ca..bda4891 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -23,10 +23,12 @@ import java.io.FileWriter; import java.io.PrintWriter; import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; @@ -162,11 +164,11 @@ public void testAllocationFileParsing() throws Exception { out.println(""); // Give queue A a minimum of 1024 M out.println(""); - out.println("1024mb,0vcores"); + out.println("1024mb,0vcores,0vdisks"); out.println(""); // Give queue B a minimum of 2048 M out.println(""); - out.println("2048mb,0vcores"); + out.println("2048mb,0vcores,0vdisks"); out.println("alice,bob admins"); out.println("fair"); out.println(""); @@ -355,11 +357,11 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { out.println(""); // Give queue A a minimum of 1024 M out.println(""); - out.println("1024mb,0vcores"); + out.println("1024mb,0vcores,0vdisks"); out.println(""); // Give queue B a minimum of 2048 M out.println(""); - out.println("2048mb,0vcores"); + out.println("2048mb,0vcores,0vdisks"); out.println("alice,bob admins"); out.println(""); // Give queue C no minimum @@ -549,6 +551,144 @@ public void testQueueNameContainingPeriods() throws Exception { allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); } + + @Test + public void testDRFResourceTypeConfiguration() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + // Queue A: cpu + out.println(""); + out.println("CPU"); + out.println("drf"); + out.println(""); + // Queue B: memory + out.println(""); + out.println("MEMORY"); + out.println(""); + // Queue C: diskIO + out.println(""); + out.println("DISKIO"); + out.println(""); + // Queue D: cpu & memory + out.println(""); + out.println("CPU,MEMORY"); + out.println(""); + // Queue E: cpu & diskIO + out.println(""); + out.println("CPU,DISKIO"); + out.println(""); + // Queue F: memory & diskIO + out.println(""); + out.println("MEMORY,DISKIO"); + out.println(""); + // Queue G: cpu & memory & diskIO + out.println(""); + out.println("CPU,MEMORY,DISKIO"); + out.println(""); + // Queue H: default + out.println(""); + out.println(""); + // Set default scheduling policy to DRF + out.println("drf"); + //Default: only enable cpu & memory + out.println("CPU,MEMORY"); + out.println(""); + out.close(); + + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + AllocationConfiguration queueConf = confHolder.allocConf; + + // Queue A: cpu + Set enabledResources = ((DominantResourceFairnessPolicy) + queueConf.getSchedulingPolicy("root.queueA")).getEnabledResourceTypes(); + assertTrue(checkDRFResourceTypes(enabledResources, + new ResourceType[]{ResourceType.CPU})); + // Queue B: memory + enabledResources = ((DominantResourceFairnessPolicy) + queueConf.getSchedulingPolicy("root.queueB")).getEnabledResourceTypes(); + assertTrue(checkDRFResourceTypes(enabledResources, + new ResourceType[]{ResourceType.MEMORY})); + // Queue C: diskIO + enabledResources = ((DominantResourceFairnessPolicy) + queueConf.getSchedulingPolicy("root.queueC")).getEnabledResourceTypes(); + assertTrue(checkDRFResourceTypes(enabledResources, + new ResourceType[]{ResourceType.DISKIO})); + // Queue D: cpu & memory + enabledResources = ((DominantResourceFairnessPolicy) + queueConf.getSchedulingPolicy("root.queueD")).getEnabledResourceTypes(); + assertTrue(checkDRFResourceTypes(enabledResources, + new ResourceType[]{ResourceType.CPU, ResourceType.MEMORY})); + // Queue E: cpu & diskIO + enabledResources = ((DominantResourceFairnessPolicy) + queueConf.getSchedulingPolicy("root.queueE")).getEnabledResourceTypes(); + assertTrue(checkDRFResourceTypes(enabledResources, + new ResourceType[]{ResourceType.CPU, ResourceType.DISKIO})); + // Queue F: memory & diskIO + enabledResources = ((DominantResourceFairnessPolicy) + queueConf.getSchedulingPolicy("root.queueF")).getEnabledResourceTypes(); + assertTrue(checkDRFResourceTypes(enabledResources, + new ResourceType[]{ResourceType.MEMORY, ResourceType.DISKIO})); + // Queue G: cpu & memory & diskIO + enabledResources = ((DominantResourceFairnessPolicy) + queueConf.getSchedulingPolicy("root.queueG")).getEnabledResourceTypes(); + assertTrue(checkDRFResourceTypes(enabledResources, + new ResourceType[]{ResourceType.CPU, ResourceType.MEMORY, + ResourceType.DISKIO})); + // Queue H: default + enabledResources = ((DominantResourceFairnessPolicy) + queueConf.getSchedulingPolicy("root.queueH")).getEnabledResourceTypes(); + assertTrue(checkDRFResourceTypes(enabledResources, + new ResourceType[]{ResourceType.CPU, ResourceType.MEMORY})); + } + + @Test + public void testDRFResourceTypeDefaultConfiguration() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + // Queue A: default + out.println(""); + out.println(""); + // Set default scheduling policy to DRF + out.println("drf"); + out.println(""); + out.close(); + + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + AllocationConfiguration queueConf = confHolder.allocConf; + + // Queue A: cpu + Set enabledResources = ((DominantResourceFairnessPolicy) + queueConf.getSchedulingPolicy("root.queueA")).getEnabledResourceTypes(); + assertTrue(checkDRFResourceTypes(enabledResources, + new ResourceType[]{ResourceType.CPU, ResourceType.MEMORY})); + } + + private boolean checkDRFResourceTypes(Set configured, + ResourceType... types) { + if (types.length != configured.size()) { + return false; + } + for (ResourceType type : types) { + configured.remove(type); + } + return configured.isEmpty(); + } private class ReloadListener implements AllocationFileLoaderService.Listener { public AllocationConfiguration allocConf; diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java index d729963..87927e0 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java @@ -191,6 +191,24 @@ public void testCPU() { } /** + * Test that VDisks works as well as memory + */ + @Test + public void testVDisks() { + scheds.add(new FakeSchedulable(Resources.createResource(0, 0, 20), + new ResourceWeights(2.0f))); + scheds.add(new FakeSchedulable(Resources.createResource(0, 0, 0), + new ResourceWeights(1.0f))); + scheds.add(new FakeSchedulable(Resources.createResource(0, 0, 5), + new ResourceWeights(1.0f))); + scheds.add(new FakeSchedulable(Resources.createResource(0, 0, 15), + new ResourceWeights(0.5f))); + ComputeFairShares.computeShares(scheds, + Resources.createResource(0, 0, 45), ResourceType.DISKIO); + verifyVDisksShares(20, 5, 5, 15); + } + + /** * Check that a given list of shares have been assigned to this.scheds. */ private void verifyMemoryShares(int... shares) { @@ -209,4 +227,14 @@ private void verifyCPUShares(int... shares) { Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualCores()); } } + + /** + * Check that a given list of shares have been assigned to this.scheds. + */ + private void verifyVDisksShares(int... shares) { + Assert.assertEquals(scheds.size(), shares.length); + for (int i = 0; i < shares.length; i ++) { + Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualDisks()); + } + } } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 00078cc..19cd047 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -169,12 +169,25 @@ public void testConfValidation() throws Exception { try { scheduler.serviceInit(conf); fail("Exception is expected because the min vcores allocation is" + - " larger than the max vcores allocation."); + " larger than the max vcores allocation."); } catch (YarnRuntimeException e) { // Exception is expected. assertTrue("The thrown exception is not the expected one.", - e.getMessage().startsWith( - "Invalid resource scheduler vcores")); + e.getMessage().startsWith( + "Invalid resource scheduler vcores")); + } + + conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS, 2); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS, 1); + try { + scheduler.serviceInit(conf); + fail("Exception is expected because the min vdisks allocation is" + + " larger than the max vdisks allocation."); + } catch (YarnRuntimeException e) { + // Exception is expected. + assertTrue("The thrown exception is not the expected one.", + e.getMessage().startsWith("Invalid resource scheduler vdisks")); } } @@ -188,17 +201,20 @@ public void testLoadConfigurationOnInitialize() throws IOException { conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE, .5); conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7); conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, - true); + true); conf.setInt(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, - 10); + 10); conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_RACK_MS, - 5000); + 5000); conf.setInt(FairSchedulerConfiguration.LOCALITY_DELAY_NODE_MS, - 5000); + 5000); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_DISK_VDISKS, 5); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS, 2); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, - 128); + 128); + conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VDISKS, 1); scheduler.init(conf); scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); @@ -208,52 +224,66 @@ public void testLoadConfigurationOnInitialize() throws IOException { Assert.assertEquals(.5, scheduler.nodeLocalityThreshold, .01); Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01); Assert.assertTrue("The continuous scheduling should be enabled", - scheduler.continuousSchedulingEnabled); + scheduler.continuousSchedulingEnabled); Assert.assertEquals(10, scheduler.continuousSchedulingSleepMs); Assert.assertEquals(5000, scheduler.nodeLocalityDelayMs); Assert.assertEquals(5000, scheduler.rackLocalityDelayMs); Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory()); Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory()); - Assert.assertEquals(128, - scheduler.getIncrementResourceCapability().getMemory()); + Assert.assertEquals(5, scheduler.getMaximumResourceCapability().getVirtualDisks()); + Assert.assertEquals(2, scheduler.getMinimumResourceCapability().getVirtualDisks()); + Assert.assertEquals(128, + scheduler.getIncrementResourceCapability().getMemory()); + Assert.assertEquals(1, + scheduler.getIncrementResourceCapability().getVirtualDisks()); } - - @Test + + @Test public void testNonMinZeroResourcesSettings() throws IOException { scheduler = new FairScheduler(); YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 256); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS, 1); conf.setInt( - FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( - FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VDISKS, 1); scheduler.init(conf); scheduler.reinitialize(conf, null); Assert.assertEquals(256, scheduler.getMinimumResourceCapability().getMemory()); Assert.assertEquals(1, scheduler.getMinimumResourceCapability().getVirtualCores()); + Assert.assertEquals(1, scheduler.getMinimumResourceCapability().getVirtualDisks()); Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory()); Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores()); - } - - @Test - public void testMinZeroResourcesSettings() throws IOException { - scheduler = new FairScheduler(); + Assert.assertEquals(1, scheduler.getIncrementResourceCapability().getVirtualDisks()); + } + + @Test + public void testMinZeroResourcesSettings() throws IOException { + FairScheduler fs = new FairScheduler(); YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0); + conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_DISK_VDISKS, 0); conf.setInt( - FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( - FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); - scheduler.init(conf); - scheduler.reinitialize(conf, null); - Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getMemory()); - Assert.assertEquals(0, scheduler.getMinimumResourceCapability().getVirtualCores()); - Assert.assertEquals(512, scheduler.getIncrementResourceCapability().getMemory()); - Assert.assertEquals(2, scheduler.getIncrementResourceCapability().getVirtualCores()); - } - + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); + conf.setInt( + FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VDISKS, 1); + fs.init(conf); + fs.reinitialize(conf, null); + Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); + Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores()); + Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualDisks()); + Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); + Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores()); + Assert.assertEquals(1, fs.getIncrementResourceCapability().getVirtualDisks()); + } + @Test public void testAggregateCapacityTracking() throws Exception { scheduler.init(conf); @@ -529,7 +559,7 @@ public void testQueueInfo() throws IOException { // Add one big node (only care about aggregate capacity) RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8, 8), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -688,17 +718,17 @@ public void testSimpleContainerAllocation() throws IOException { // Add a node RMNode node1 = MockNodes - .newNodeInfo(1, Resources.createResource(1024, 4), 1, "127.0.0.1"); + .newNodeInfo(1, Resources.createResource(1024, 4, 10), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); // Add another node RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(512, 2), 2, "127.0.0.2"); + MockNodes.newNodeInfo(1, Resources.createResource(512, 2, 5), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); - createSchedulingRequest(512, 2, "queue1", "user1", 2); + createSchedulingRequest(512, 2, 5, "queue1", "user1", 2); scheduler.update(); @@ -708,15 +738,17 @@ public void testSimpleContainerAllocation() throws IOException { // Asked for less than increment allocation. assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(updateEvent2); assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getMemory()); + getResourceUsage().getMemory()); assertEquals(2, scheduler.getQueueManager().getQueue("queue1"). - getResourceUsage().getVirtualCores()); + getResourceUsage().getVirtualCores()); + assertEquals(5, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getVirtualDisks()); // verify metrics QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1") @@ -1053,10 +1085,10 @@ public void testFairShareWithMinAlloc() throws Exception { out.println(""); out.println(""); out.println(""); - out.println("1024mb,0vcores"); + out.println("1024mb,0vcores,0vdisks"); out.println(""); out.println(""); - out.println("2048mb,0vcores"); + out.println("2048mb,0vcores,0vdisks"); out.println(""); out.println(""); out.close(); @@ -1099,7 +1131,7 @@ public void testNestedUserQueue() throws IOException { out.println(""); out.println(""); out.println(""); - out.println("1024mb,0vcores"); + out.println("1024mb,0vcores,0vdisks"); out.println(""); out.println(""); out.println(""); @@ -1131,7 +1163,7 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception { out.println(""); out.println(""); out.println(""); - out.println("1024mb,0vcores"); + out.println("1024mb,0vcores,0vdisks"); out.println(""); out.println(""); out.println(""); @@ -1392,15 +1424,15 @@ public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAX out.println(""); out.println(""); out.println(""); - out.println("2048mb,0vcores"); + out.println("2048mb,0vcores,0vdisks"); out.println(""); out.println(""); - out.println("2048mb,0vcores"); + out.println("2048mb,0vcores,0vdisks"); out.println(""); - out.println("2048mb,0vcores"); + out.println("2048mb,0vcores,0vdisks"); out.println(""); out.println(""); - out.println("2048mb,0vcores"); + out.println("2048mb,0vcores,0vdisks"); out.println(""); out.println(""); out.println(""); @@ -1432,10 +1464,10 @@ public void testConfigureRootQueue() throws Exception { out.println(""); out.println(" drf"); out.println(" "); - out.println(" 1024mb,1vcores"); + out.println(" 1024mb,1vcores,1vdisks"); out.println(" "); out.println(" "); - out.println(" 1024mb,4vcores"); + out.println(" 1024mb,4vcores,4vdisks"); out.println(" "); out.println(" 100"); out.println(" 120"); @@ -1547,10 +1579,10 @@ public void testChoiceOfPreemptedContainers() throws Exception { assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); // Now new requests arrive from queueC and default - createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); - createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); - createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); - createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, 1, "default", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, 1, "default", "user1", 1, 1); scheduler.update(); // We should be able to claw back one container from queueA and queueB each. @@ -1713,23 +1745,23 @@ public void testPreemptionDecision() throws Exception { out.println(""); out.println(""); out.println(""); - out.println("0mb,0vcores"); + out.println("0mb,0vcores,0vdisks"); out.println(""); out.println(""); out.println(".25"); - out.println("1024mb,0vcores"); + out.println("1024mb,0vcores,0vdisks"); out.println(""); out.println(""); out.println(".25"); - out.println("1024mb,0vcores"); + out.println("1024mb,0vcores,0vdisks"); out.println(""); out.println(""); out.println(".25"); - out.println("1024mb,0vcores"); + out.println("1024mb,0vcores,0vdisks"); out.println(""); out.println(""); out.println(".25"); - out.println("1024mb,0vcores"); + out.println("1024mb,0vcores,0vdisks"); out.println(""); out.println("5"); out.println("10"); @@ -1743,19 +1775,19 @@ public void testPreemptionDecision() throws Exception { // Create four nodes RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2, 4), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2, 4), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2, 4), 3, "127.0.0.3"); NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); scheduler.handle(nodeEvent3); @@ -2183,7 +2215,7 @@ public void testUserMaxRunningApps() throws Exception { // Add a node RMNode node1 = MockNodes - .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + .newNodeInfo(1, Resources.createResource(8192, 8, 10), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -2226,53 +2258,53 @@ public void testReservationWhileMultiplePriorities() throws IOException { // Add a node RMNode node1 = MockNodes - .newNodeInfo(1, Resources.createResource(1024, 4), 1, "127.0.0.1"); + .newNodeInfo(1, Resources.createResource(1024, 4, 2), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); - ApplicationAttemptId attId = createSchedulingRequest(1024, 4, "queue1", + ApplicationAttemptId attId = createSchedulingRequest(1024, 4, 2, "queue1", "user1", 1, 2); scheduler.update(); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); scheduler.handle(updateEvent); - + FSAppAttempt app = scheduler.getSchedulerApp(attId); assertEquals(1, app.getLiveContainers().size()); - + ContainerId containerId = scheduler.getSchedulerApp(attId) .getLiveContainers().iterator().next().getContainerId(); // Cause reservation to be created - createSchedulingRequestExistingApplication(1024, 4, 2, attId); + createSchedulingRequestExistingApplication(1024, 4, 2, 2, attId); scheduler.update(); scheduler.handle(updateEvent); assertEquals(1, app.getLiveContainers().size()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); - + // Create request at higher priority - createSchedulingRequestExistingApplication(1024, 4, 1, attId); + createSchedulingRequestExistingApplication(1024, 4, 2, 1, attId); scheduler.update(); scheduler.handle(updateEvent); - + assertEquals(1, app.getLiveContainers().size()); // Reserved container should will be at higher priority, // since old reservation cannot be satisfied for (RMContainer container : app.getReservedContainers()) { assertEquals(1, container.getReservedPriority().getPriority()); } - + // Complete container scheduler.allocate(attId, new ArrayList(), Arrays.asList(containerId), null, null); assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(4, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); - + // Schedule at opening scheduler.update(); scheduler.handle(updateEvent); - + // Reserved container (at higher priority) should be run Collection liveContainers = app.getLiveContainers(); assertEquals(1, liveContainers.size()); @@ -2282,7 +2314,7 @@ public void testReservationWhileMultiplePriorities() throws IOException { assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); } - + @Test public void testAclSubmitApplication() throws Exception { // Set acl's @@ -2376,7 +2408,7 @@ public void testFifoWithinQueue() throws Exception { RMNode node1 = MockNodes - .newNodeInfo(1, Resources.createResource(3072, 3), 1, "127.0.0.1"); + .newNodeInfo(1, Resources.createResource(3072, 3, 10), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); @@ -2421,7 +2453,7 @@ public void testMaxAssign() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = - MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0, + MockNodes.newNodeInfo(1, Resources.createResource(16384, 16, 20), 0, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); @@ -2445,25 +2477,25 @@ public void testMaxAssign() throws Exception { assertEquals("Incorrect number of containers allocated", 8, app .getLiveContainers().size()); } - + @Test(timeout = 3000) public void testMaxAssignWithZeroMemoryContainers() throws Exception { conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0); - + scheduler.init(conf); scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = - MockNodes.newNodeInfo(1, Resources.createResource(16384, 16), 0, + MockNodes.newNodeInfo(1, Resources.createResource(16384, 16, 20), 0, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); scheduler.handle(nodeEvent); ApplicationAttemptId attId = - createSchedulingRequest(0, 1, "root.default", "user", 8); + createSchedulingRequest(0, 1, 1, "root.default", "user", 8); FSAppAttempt app = scheduler.getSchedulerApp(attId); // set maxAssign to 2: only 2 containers should be allocated @@ -2508,10 +2540,10 @@ public void testAssignContainer() throws Exception { RMNode node1 = MockNodes - .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + .newNodeInfo(1, Resources.createResource(8192, 8, 10), 1, "127.0.0.1"); RMNode node2 = MockNodes - .newNodeInfo(1, Resources.createResource(8192, 8), 2, "127.0.0.2"); + .newNodeInfo(1, Resources.createResource(8192, 8, 10), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); @@ -2562,7 +2594,7 @@ public void testAssignContainer() throws Exception { } } } - + @SuppressWarnings("unchecked") @Test public void testNotAllowSubmitApplication() throws Exception { @@ -2600,9 +2632,9 @@ public void testNotAllowSubmitApplication() throws Exception { submissionContext.setApplicationId(applicationId); submissionContext.setAMContainerSpec(clc); RMApp application = - new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user, - queue, submissionContext, scheduler, masterService, - System.currentTimeMillis(), "YARN", null, null); + new RMAppImpl(applicationId, resourceManager.getRMContext(), conf, name, user, + queue, submissionContext, scheduler, masterService, + System.currentTimeMillis(), "YARN", null, null); resourceManager.getRMContext().getRMApps().putIfAbsent(applicationId, application); application.handle(new RMAppEvent(applicationId, RMAppEventType.START)); @@ -2639,28 +2671,28 @@ public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { scheduler.reinitialize(conf, resourceManager.getRMContext()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); - assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); - - RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 4), 1, + assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); + + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024, 4, 8), 1, "127.0.0.1"); NodeAddedSchedulerEvent addEvent = new NodeAddedSchedulerEvent(node1); scheduler.handle(addEvent); - + assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(4, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); scheduler.update(); // update shouldn't change things assertEquals(1024, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(4, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); - + NodeRemovedSchedulerEvent removeEvent = new NodeRemovedSchedulerEvent(node1); scheduler.handle(removeEvent); - + assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); scheduler.update(); // update shouldn't change things assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); -} + } @Test public void testStrictLocality() throws IOException { @@ -2797,19 +2829,42 @@ public void testReservationsStrictLocality() throws IOException { scheduler.handle(nodeUpdateEvent); assertEquals(0, app.getReservedContainers().size()); } - + @Test public void testNoMoreCpuOnNode() throws IOException { scheduler.init(conf); scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1), + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1, 2), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); - - ApplicationAttemptId attId = createSchedulingRequest(1024, 1, "default", + + ApplicationAttemptId attId = createSchedulingRequest(1024, 1, 1, "default", + "user1", 2); + FSAppAttempt app = scheduler.getSchedulerApp(attId); + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + assertEquals(1, app.getLiveContainers().size()); + scheduler.handle(updateEvent); + assertEquals(1, app.getLiveContainers().size()); + } + + @Test + public void testNoMoreVdisksOnNode() throws IOException { + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2, 1), + 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + ApplicationAttemptId attId = createSchedulingRequest(1024, 1, 1, "default", "user1", 2); FSAppAttempt app = scheduler.getSchedulerApp(attId); scheduler.update(); @@ -2868,25 +2923,29 @@ public void testBasicDRFAssignment() throws Exception { scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5)); + RMNode node = MockNodes.newNodeInfo(1, Resource.newInstance(8192, 5, 8)); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); scheduler.handle(nodeEvent); - ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, "queue1", + ApplicationAttemptId appAttId1 = createSchedulingRequest(2048, 1, 3, "queue1", "user1", 2); FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); - ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, "queue1", + ApplicationAttemptId appAttId2 = createSchedulingRequest(1024, 2, 1, "queue1", "user1", 2); FSAppAttempt app2 = scheduler.getSchedulerApp(appAttId2); DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); + Set enabledResources = new HashSet(); + enabledResources.add(ResourceType.CPU); + enabledResources.add(ResourceType.MEMORY); + drfPolicy.setEnabledResourceTypes(enabledResources); drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); scheduler.update(); // First both apps get a container // Then the first gets another container because its dominant share of - // 2048/8192 is less than the other's of 2/5 + // 3/8 is less than the other's of 2/5 NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); scheduler.handle(updateEvent); Assert.assertEquals(1, app1.getLiveContainers().size()); @@ -2910,12 +2969,12 @@ public void testBasicDRFWithQueues() throws Exception { scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7), + RMNode node = MockNodes.newNodeInfo(1, Resource.newInstance(8192, 7, 10), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); scheduler.handle(nodeEvent); - ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, "queue1", + ApplicationAttemptId appAttId1 = createSchedulingRequest(3072, 1, 2, "queue1", "user1", 2); FSAppAttempt app1 = scheduler.getSchedulerApp(appAttId1); ApplicationAttemptId appAttId2 = createSchedulingRequest(2048, 2, "queue1", @@ -2924,8 +2983,12 @@ public void testBasicDRFWithQueues() throws Exception { ApplicationAttemptId appAttId3 = createSchedulingRequest(1024, 2, "queue2", "user1", 2); FSAppAttempt app3 = scheduler.getSchedulerApp(appAttId3); - + DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); + Set enabledResources = new HashSet(); + enabledResources.add(ResourceType.CPU); + enabledResources.add(ResourceType.MEMORY); + drfPolicy.setEnabledResourceTypes(enabledResources); drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); @@ -2933,6 +2996,7 @@ public void testBasicDRFWithQueues() throws Exception { NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); scheduler.handle(updateEvent); + // (5/8, 3/7, 3/10), (1/8, 2/7, 3/10) Assert.assertEquals(1, app1.getLiveContainers().size()); scheduler.handle(updateEvent); Assert.assertEquals(1, app3.getLiveContainers().size()); @@ -2969,8 +3033,12 @@ public void testDRFHierarchicalQueues() throws Exception { "user1", 2); Thread.sleep(3); // so that start times will be different FSAppAttempt app4 = scheduler.getSchedulerApp(appAttId4); - + DominantResourceFairnessPolicy drfPolicy = new DominantResourceFairnessPolicy(); + Set enabledResources = new HashSet(); + enabledResources.add(ResourceType.CPU); + enabledResources.add(ResourceType.MEMORY); + drfPolicy.setEnabledResourceTypes(enabledResources); drfPolicy.initialize(scheduler.getClusterResource()); scheduler.getQueueManager().getQueue("root").setPolicy(drfPolicy); scheduler.getQueueManager().getQueue("queue1").setPolicy(drfPolicy); @@ -3028,15 +3096,15 @@ public void testHostPortNodeName() throws Exception { NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); - ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 0); - ResourceRequest nodeRequest = createResourceRequest(1024, + ResourceRequest nodeRequest = createResourceRequest(1024, node1.getNodeID().getHost() + ":" + node1.getNodeID().getPort(), 1, 1, true); - ResourceRequest rackRequest = createResourceRequest(1024, + ResourceRequest rackRequest = createResourceRequest(1024, node1.getRackName(), 1, 1, false); - ResourceRequest anyRequest = createResourceRequest(1024, + ResourceRequest anyRequest = createResourceRequest(1024, ResourceRequest.ANY, 1, 1, false); createSchedulingRequestExistingApplication(nodeRequest, attId1); createSchedulingRequestExistingApplication(rackRequest, attId1); @@ -3151,7 +3219,7 @@ public void testQueueMaxAMShare() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = - MockNodes.newNodeInfo(1, Resources.createResource(20480, 20), + MockNodes.newNodeInfo(1, Resources.createResource(20480, 20, 20), 0, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); @@ -3166,9 +3234,9 @@ public void testQueueMaxAMShare() throws Exception { scheduler.update(); scheduler.handle(updateEvent); - Resource amResource1 = Resource.newInstance(1024, 1); - Resource amResource2 = Resource.newInstance(2048, 2); - Resource amResource3 = Resource.newInstance(1860, 2); + Resource amResource1 = Resource.newInstance(1024, 1, 1); + Resource amResource2 = Resource.newInstance(2048, 2, 2); + Resource amResource3 = Resource.newInstance(1860, 2, 2); int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); // Exceeds no limits ApplicationAttemptId attId1 = createAppAttemptId(1, 1); @@ -3294,7 +3362,7 @@ public void testQueueMaxAMShare() throws Exception { // Check amResource normalization ApplicationAttemptId attId6 = createAppAttemptId(6, 1); createApplicationWithAMResource(attId6, "queue1", "user1", amResource3); - createSchedulingRequestExistingApplication(1860, 2, amPriority, attId6); + createSchedulingRequestExistingApplication(1860, 2, 2, amPriority, attId6); FSAppAttempt app6 = scheduler.getSchedulerApp(attId6); scheduler.update(); scheduler.handle(updateEvent); @@ -3343,7 +3411,7 @@ public void testQueueMaxAMShareDefault() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = - MockNodes.newNodeInfo(1, Resources.createResource(8192, 20), + MockNodes.newNodeInfo(1, Resources.createResource(8192, 20, 20), 0, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); @@ -3379,7 +3447,7 @@ public void testQueueMaxAMShareDefault() throws Exception { scheduler.handle(updateEvent); } - Resource amResource1 = Resource.newInstance(1024, 1); + Resource amResource1 = Resource.newInstance(1024, 1, 1); int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); // The fair share is 2048 MB, and the default maxAMShare is 0.5f, @@ -3506,28 +3574,29 @@ public void testContinuousScheduling() throws Exception { // Add two nodes RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 1, - "127.0.0.1"); + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8, 10), 1, + "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8), 2, - "127.0.0.2"); + MockNodes.newNodeInfo(1, Resources.createResource(8 * 1024, 8, 10), 2, + "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); // available resource Assert.assertEquals(scheduler.getClusterResource().getMemory(), 16 * 1024); Assert.assertEquals(scheduler.getClusterResource().getVirtualCores(), 16); + Assert.assertEquals(scheduler.getClusterResource().getVirtualDisks(), 20); // send application request ApplicationAttemptId appAttemptId = - createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); + createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++); scheduler.addApplication(appAttemptId.getApplicationId(), "queue11", "user11", false); scheduler.addApplicationAttempt(appAttemptId, false, false); List ask = new ArrayList(); ResourceRequest request = - createResourceRequest(1024, 1, ResourceRequest.ANY, 1, 1, true); + createResourceRequest(1024, 1, 1, ResourceRequest.ANY, 1, 1, true); ask.add(request); scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); @@ -3542,20 +3611,22 @@ public void testContinuousScheduling() throws Exception { // check consumption Assert.assertEquals(1024, app.getCurrentConsumption().getMemory()); Assert.assertEquals(1, app.getCurrentConsumption().getVirtualCores()); + Assert.assertEquals(1, app.getCurrentConsumption().getVirtualDisks()); // another request request = - createResourceRequest(1024, 1, ResourceRequest.ANY, 2, 1, true); + createResourceRequest(1024, 1, 1, ResourceRequest.ANY, 2, 1, true); ask.clear(); ask.add(request); scheduler.allocate(appAttemptId, ask, new ArrayList(), null, null); // Wait until app gets resources while (app.getCurrentConsumption() - .equals(Resources.createResource(1024, 1, 0))) { } + .equals(Resources.createResource(1024, 1, 1))) { } Assert.assertEquals(2048, app.getCurrentConsumption().getMemory()); Assert.assertEquals(2, app.getCurrentConsumption().getVirtualCores()); + Assert.assertEquals(2, app.getCurrentConsumption().getVirtualDisks()); // 2 containers should be assigned to 2 nodes Set nodes = new HashSet(); @@ -3767,7 +3838,7 @@ public void testBlacklistNodes() throws Exception { final int GB = 1024; String host = "127.0.0.1"; RMNode node = - MockNodes.newNodeInfo(1, Resources.createResource(16 * GB, 16), + MockNodes.newNodeInfo(1, Resources.createResource(16 * GB, 16, 20), 0, host); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); @@ -3819,12 +3890,12 @@ public void testGetAppsInQueue() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId appAttId1 = - createSchedulingRequest(1024, 1, "queue1.subqueue1", "user1"); + createSchedulingRequest(1024, 1, 1, "queue1.subqueue1", "user1"); ApplicationAttemptId appAttId2 = - createSchedulingRequest(1024, 1, "queue1.subqueue2", "user1"); + createSchedulingRequest(1024, 1, 1, "queue1.subqueue2", "user1"); ApplicationAttemptId appAttId3 = - createSchedulingRequest(1024, 1, "default", "user1"); - + createSchedulingRequest(1024, 1, 1, "default", "user1"); + List apps = scheduler.getAppsInQueue("queue1.subqueue1"); assertEquals(1, apps.size()); @@ -3870,34 +3941,34 @@ public void testMoveRunnableApp() throws Exception { FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true); ApplicationAttemptId appAttId = - createSchedulingRequest(1024, 1, "queue1", "user1", 3); + createSchedulingRequest(1024, 1, 1, "queue1", "user1", 3); ApplicationId appId = appAttId.getApplicationId(); - RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(1024)); + RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(1024, 1, 1)); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); scheduler.handle(nodeEvent); scheduler.handle(updateEvent); - - assertEquals(Resource.newInstance(1024, 1), oldQueue.getResourceUsage()); + + assertEquals(Resource.newInstance(1024, 1, 1), oldQueue.getResourceUsage()); scheduler.update(); - assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand()); - + assertEquals(Resource.newInstance(3072, 3, 3), oldQueue.getDemand()); + scheduler.moveApplication(appId, "queue2"); FSAppAttempt app = scheduler.getSchedulerApp(appAttId); assertSame(targetQueue, app.getQueue()); assertFalse(oldQueue.getRunnableAppSchedulables().contains(app)); assertTrue(targetQueue.getRunnableAppSchedulables().contains(app)); - assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage()); - assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage()); + assertEquals(Resource.newInstance(0, 0, 0), oldQueue.getResourceUsage()); + assertEquals(Resource.newInstance(1024, 1, 1), targetQueue.getResourceUsage()); assertEquals(0, oldQueue.getNumRunnableApps()); assertEquals(1, targetQueue.getNumRunnableApps()); assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps()); - + scheduler.update(); - assertEquals(Resource.newInstance(0, 0), oldQueue.getDemand()); - assertEquals(Resource.newInstance(3072, 3), targetQueue.getDemand()); + assertEquals(Resource.newInstance(0, 0, 0), oldQueue.getDemand()); + assertEquals(Resource.newInstance(3072, 3, 3), targetQueue.getDemand()); } - + @Test public void testMoveNonRunnableApp() throws Exception { scheduler.init(conf); @@ -3909,10 +3980,10 @@ public void testMoveNonRunnableApp() throws Exception { FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true); scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue1", 0); scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue2", 0); - + ApplicationAttemptId appAttId = - createSchedulingRequest(1024, 1, "queue1", "user1", 3); - + createSchedulingRequest(1024, 1, 1, "queue1", "user1", 3); + assertEquals(0, oldQueue.getNumRunnableApps()); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); assertEquals(0, oldQueue.getNumRunnableApps()); @@ -3930,13 +4001,13 @@ public void testMoveMakesAppRunnable() throws Exception { FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true); scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue1", 0); - + ApplicationAttemptId appAttId = - createSchedulingRequest(1024, 1, "queue1", "user1", 3); - + createSchedulingRequest(1024, 1, 1, "queue1", "user1", 3); + FSAppAttempt app = scheduler.getSchedulerApp(appAttId); assertTrue(oldQueue.getNonRunnableAppSchedulables().contains(app)); - + scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); assertFalse(oldQueue.getNonRunnableAppSchedulables().contains(app)); assertFalse(targetQueue.getNonRunnableAppSchedulables().contains(app)); @@ -3954,10 +4025,10 @@ public void testMoveWouldViolateMaxAppsConstraints() throws Exception { QueueManager queueMgr = scheduler.getQueueManager(); queueMgr.getLeafQueue("queue2", true); scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue2", 0); - + ApplicationAttemptId appAttId = - createSchedulingRequest(1024, 1, "queue1", "user1", 3); - + createSchedulingRequest(1024, 1, 1, "queue1", "user1", 3); + scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); } @@ -3971,18 +4042,18 @@ public void testMoveWouldViolateMaxResourcesConstraints() throws Exception { FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); queueMgr.getLeafQueue("queue2", true); scheduler.getAllocationConfiguration().maxQueueResources.put("root.queue2", - Resource.newInstance(1024, 1)); + Resource.newInstance(1024, 1, 1)); ApplicationAttemptId appAttId = - createSchedulingRequest(1024, 1, "queue1", "user1", 3); - RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2)); + createSchedulingRequest(1024, 1, 1, "queue1", "user1", 3); + RMNode node = MockNodes.newNodeInfo(1, Resources.createResource(2048, 2, 2)); NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node); scheduler.handle(nodeEvent); scheduler.handle(updateEvent); scheduler.handle(updateEvent); - - assertEquals(Resource.newInstance(2048, 2), oldQueue.getResourceUsage()); + + assertEquals(Resource.newInstance(2048, 2, 2), oldQueue.getResourceUsage()); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); } @@ -3993,9 +4064,9 @@ public void testMoveToNonexistentQueue() throws Exception { scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.getQueueManager().getLeafQueue("queue1", true); - + ApplicationAttemptId appAttId = - createSchedulingRequest(1024, 1, "queue1", "user1", 3); + createSchedulingRequest(1024, 1, 1, "queue1", "user1", 3); scheduler.moveApplication(appAttId.getApplicationId(), "queue2"); } diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java index 82b50a6..cc33f19 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java @@ -20,31 +20,25 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration.parseResourceConfigValue; import static org.junit.Assert.assertEquals; -import java.io.File; +import org.apache.hadoop.yarn.api.records.Resource; -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Test; public class TestFairSchedulerConfiguration { @Test public void testParseResourceConfigValue() throws Exception { - assertEquals(BuilderUtils.newResource(1024, 2), - parseResourceConfigValue("2 vcores, 1024 mb")); - assertEquals(BuilderUtils.newResource(1024, 2), - parseResourceConfigValue("1024 mb, 2 vcores")); - assertEquals(BuilderUtils.newResource(1024, 2), - parseResourceConfigValue("2vcores,1024mb")); - assertEquals(BuilderUtils.newResource(1024, 2), - parseResourceConfigValue("1024mb,2vcores")); - assertEquals(BuilderUtils.newResource(1024, 2), - parseResourceConfigValue("1024 mb, 2 vcores")); - assertEquals(BuilderUtils.newResource(1024, 2), - parseResourceConfigValue("1024 Mb, 2 vCores")); - assertEquals(BuilderUtils.newResource(1024, 2), - parseResourceConfigValue(" 1024 mb, 2 vcores ")); + assertEquals(Resource.newInstance(1024, 2, 4), + parseResourceConfigValue("2 vcores, 1024 mb, 4 vdisks")); + assertEquals(Resource.newInstance(1024, 2, 4), + parseResourceConfigValue("1024 mb, 2 vcores, 4 vdisks")); + assertEquals(Resource.newInstance(1024, 2, 4), + parseResourceConfigValue("4 vdisks, 1024 mb, 2 vcores")); + assertEquals(Resource.newInstance(1024, 2, 4), + parseResourceConfigValue("2vcores,1024mb, 4vdisks")); + assertEquals(Resource.newInstance(1024, 2, 4), + parseResourceConfigValue("1024mb,2vcores, 4vdisks")); + assertEquals(Resource.newInstance(1024, 2, 4), + parseResourceConfigValue("4vdisks,1024mb, 2vcores")); } @Test(expected = AllocationConfigurationException.class) @@ -61,6 +55,11 @@ public void testOnlyMemory() throws Exception { public void testOnlyCPU() throws Exception { parseResourceConfigValue("1024vcores"); } + + @Test(expected = AllocationConfigurationException.class) + public void testOnlyVdisks() throws Exception { + parseResourceConfigValue("4vdisks"); + } @Test(expected = AllocationConfigurationException.class) public void testGibberish() throws Exception { diff --git hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java index eb7e792..9f3a490 100644 --- hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java +++ hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue; import java.util.Comparator; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; @@ -43,7 +45,17 @@ } private Comparator createComparator(Resource capacity) { + Set enabledResourceTypes = new HashSet(); + for (ResourceType type : ResourceType.values()) { + enabledResourceTypes.add(type); + } + return createComparator(capacity, enabledResourceTypes); + } + + private Comparator createComparator(Resource capacity, + Set enabledResourceTypes) { DominantResourceFairnessPolicy policy = new DominantResourceFairnessPolicy(); + policy.setEnabledResourceTypes(enabledResourceTypes); policy.initialize(capacity); return policy.getComparator(); } @@ -90,6 +102,55 @@ public void testDifferentDominantResource() { } @Test + public void testOnlyOneResource() { + Resource capacity = Resource.newInstance(8000, 8, 20); + // only CPU + Set enabledResourceTypes = new HashSet(); + enabledResourceTypes.add(ResourceType.CPU); + assertTrue(createComparator(capacity, enabledResourceTypes).compare( + createSchedulable(4000, 3, 15), + createSchedulable(2000, 5, 3)) < 0); + // only MEMORY + enabledResourceTypes.clear(); + enabledResourceTypes.add(ResourceType.MEMORY); + assertTrue(createComparator(capacity, enabledResourceTypes).compare( + createSchedulable(5000, 3, 15), + createSchedulable(2000, 4, 3)) > 0); + // only DISKIO + enabledResourceTypes.clear(); + enabledResourceTypes.add(ResourceType.DISKIO); + assertTrue(createComparator(capacity, enabledResourceTypes).compare( + createSchedulable(4000, 5, 2), + createSchedulable(2000, 2, 3)) < 0); + } + + @Test + public void testOnlyTwoResources() { + Resource capacity = Resource.newInstance(8000, 8, 20); + // CPU & MEMORY + Set enabledResourceTypes = new HashSet(); + enabledResourceTypes.add(ResourceType.CPU); + enabledResourceTypes.add(ResourceType.MEMORY); + assertTrue(createComparator(capacity, enabledResourceTypes).compare( + createSchedulable(3000, 1, 2), + createSchedulable(2000, 2, 7)) > 0); + + // CPU & DISKIO + enabledResourceTypes.remove(ResourceType.MEMORY); + enabledResourceTypes.add(ResourceType.DISKIO); + assertTrue(createComparator(capacity, enabledResourceTypes).compare( + createSchedulable(3000, 1, 2), + createSchedulable(2000, 2, 7)) < 0); + + // MEMORY & DISKIO + enabledResourceTypes.remove(ResourceType.CPU); + enabledResourceTypes.add(ResourceType.MEMORY); + assertTrue(createComparator(capacity, enabledResourceTypes).compare( + createSchedulable(5000, 3, 2), + createSchedulable(2000, 5, 7)) > 0); + } + + @Test public void testOneIsNeedy() { assertTrue(createComparator(8000, 8, 20).compare( createSchedulable(2000, 5, 1, 0, 6, 0), @@ -156,10 +217,9 @@ public void testCalculateShares() { Resource capacity = Resources.createResource(100, 10, 20); ResourceType[] resourceOrder = new ResourceType[3]; ResourceWeights shares = new ResourceWeights(); - DominantResourceFairnessPolicy.DominantResourceFairnessComparator comparator = - new DominantResourceFairnessPolicy.DominantResourceFairnessComparator(); - comparator.calculateShares(used, capacity, shares, resourceOrder, - ResourceWeights.NEUTRAL); + ((DominantResourceFairnessPolicy.DominantResourceFairnessComparator) + createComparator(capacity)).calculateShares( + used, capacity, shares, resourceOrder, ResourceWeights.NEUTRAL); assertEquals(.1, shares.getWeight(ResourceType.MEMORY), .00001); assertEquals(.5, shares.getWeight(ResourceType.CPU), .00001);