diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 20a5a1ff790..0a0e9d627e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import static org.apache.hadoop.metrics2.lib.Interns.info; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .QueueMetricsForCustomResources.MetricsForCustomResource.*; import java.util.ArrayList; import java.util.HashMap; @@ -45,6 +47,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .QueueMetricsForCustomResources.MetricsForCustomResource; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +118,7 @@ protected final MetricsSystem metricsSystem; protected final Map users; protected final Configuration conf; + protected final QueueMetricsForCustomResources queueMetricsForCustomResources; protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { @@ -123,6 +128,7 @@ protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, this.users = enableUserMetrics ? new HashMap() : null; metricsSystem = ms; + this.queueMetricsForCustomResources = new QueueMetricsForCustomResources(); this.conf = conf; runningTime = buildBuckets(conf); } @@ -350,9 +356,11 @@ public void moveAppTo(AppSchedulingInfo app) { * @param limit resource limit */ public void setAvailableResourcesToQueue(String partition, Resource limit) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { availableMB.set(limit.getMemorySize()); availableVCores.set(limit.getVirtualCores()); + queueMetricsForCustomResources.set(MetricsForCustomResource.AVAILABLE, + limit); } } @@ -392,7 +400,7 @@ public void setAvailableResourcesToUser(String partition, */ public void incrPendingResources(String partition, String user, int containers, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { _incrPendingResources(containers, res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { @@ -408,12 +416,14 @@ private void _incrPendingResources(int containers, Resource res) { pendingContainers.incr(containers); pendingMB.incr(res.getMemorySize() * containers); pendingVCores.incr(res.getVirtualCores() * containers); + queueMetricsForCustomResources.increaseWithMultiplier(PENDING, res, + containers); } public void decrPendingResources(String partition, String user, int containers, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { _decrPendingResources(containers, res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { @@ -429,6 +439,8 @@ private void _decrPendingResources(int containers, Resource res) { pendingContainers.decr(containers); pendingMB.decr(res.getMemorySize() * containers); pendingVCores.decr(res.getVirtualCores() * containers); + queueMetricsForCustomResources.decreaseWithMultiplier(PENDING, res, + containers); } public void incrNodeTypeAggregations(String user, NodeType type) { @@ -452,12 +464,15 @@ public void incrNodeTypeAggregations(String user, NodeType type) { public void allocateResources(String partition, String user, int containers, Resource res, boolean decrPending) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { allocatedContainers.incr(containers); aggregateContainersAllocated.incr(containers); allocatedMB.incr(res.getMemorySize() * containers); allocatedVCores.incr(res.getVirtualCores() * containers); + queueMetricsForCustomResources.increaseWithMultiplier(ALLOCATED, res, + containers); + if (decrPending) { _decrPendingResources(containers, res); } @@ -479,12 +494,14 @@ public void allocateResources(String partition, String user, * @param res */ public void allocateResources(String partition, String user, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { allocatedMB.incr(res.getMemorySize()); allocatedVCores.incr(res.getVirtualCores()); + queueMetricsForCustomResources.increase(ALLOCATED, res); pendingMB.decr(res.getMemorySize()); pendingVCores.decr(res.getVirtualCores()); + queueMetricsForCustomResources.decrease(PENDING, res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { @@ -498,11 +515,14 @@ public void allocateResources(String partition, String user, Resource res) { public void releaseResources(String partition, String user, int containers, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { allocatedContainers.decr(containers); aggregateContainersReleased.incr(containers); allocatedMB.decr(res.getMemorySize() * containers); allocatedVCores.decr(res.getVirtualCores() * containers); + queueMetricsForCustomResources.decreaseWithMultiplier(ALLOCATED, res, + containers); + QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.releaseResources(partition, user, containers, res); @@ -519,9 +539,11 @@ public void releaseResources(String partition, * @param user * @param res */ - public void releaseResources(String user, Resource res) { + private void releaseResources(String user, Resource res) { allocatedMB.decr(res.getMemorySize()); allocatedVCores.decr(res.getVirtualCores()); + queueMetricsForCustomResources.decrease(ALLOCATED, res); + QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.releaseResources(user, res); @@ -552,8 +574,18 @@ public void updatePreemptedVcoreSeconds(long vcoreSeconds) { } } + public void updatePreemptedSecondsForCustomResources(Resource res, + long seconds) { + queueMetricsForCustomResources + .increaseWithMultiplier(AGGREGATE_PREEMPTED_SECONDS, res, seconds); + if (parent != null) { + parent.queueMetricsForCustomResources.increaseWithMultiplier( + AGGREGATE_PREEMPTED_SECONDS, res, seconds); + } + } + public void reserveResource(String partition, String user, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { reserveResource(user, res); } } @@ -562,6 +594,7 @@ public void reserveResource(String user, Resource res) { reservedContainers.incr(); reservedMB.incr(res.getMemorySize()); reservedVCores.incr(res.getVirtualCores()); + queueMetricsForCustomResources.increase(RESERVED, res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.reserveResource(user, res); @@ -571,10 +604,11 @@ public void reserveResource(String user, Resource res) { } } - public void unreserveResource(String user, Resource res) { + private void unreserveResource(String user, Resource res) { reservedContainers.decr(); reservedMB.decr(res.getMemorySize()); reservedVCores.decr(res.getVirtualCores()); + queueMetricsForCustomResources.decrease(RESERVED, res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.unreserveResource(user, res); @@ -585,7 +619,7 @@ public void unreserveResource(String user, Resource res) { } public void unreserveResource(String partition, String user, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { unreserveResource(user, res); } } @@ -647,10 +681,18 @@ public int getAppsKilled() { public int getAppsFailed() { return appsFailed.value(); } - + public Resource getAllocatedResources() { + if (queueMetricsForCustomResources.isThereAnyAllocatedResource()) { + return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(), + queueMetricsForCustomResources.getAllocatedCustomResources()); + } return BuilderUtils.newResource(allocatedMB.value(), - (int) allocatedVCores.value()); + allocatedVCores.value()); + } + + public float getMaxUtilizationOfCustomResources(Resource clusterResource) { + return queueMetricsForCustomResources.getMaxAllocationUtilization(clusterResource); } public long getAllocatedMB() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java new file mode 100644 index 00000000000..b2948163195 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; + +import java.util.Map; +import java.util.function.BiFunction; + +public class QueueMetricsForCustomResources { + private final Map allocatedCustomResources = Maps.newHashMap(); + private final Map availableCustomResources = Maps.newHashMap(); + private final Map pendingCustomResources = Maps.newHashMap(); + private final Map reservedCustomResources = Maps.newHashMap(); + private final Map aggregatePreemptedSecondsForCustomResources = + Maps.newHashMap(); + + public enum MetricsForCustomResource { + ALLOCATED, AVAILABLE, PENDING, RESERVED, AGGREGATE_PREEMPTED_SECONDS + } + + public void increase(MetricsForCustomResource metricsType, Resource res) { + update(metricsType, res, Long::sum); + } + + public void increaseWithMultiplier(MetricsForCustomResource metricsType, + Resource res, long multiplier) { + update(metricsType, res, (v1, v2) -> v1 + v2 * multiplier); + } + + public void decrease(MetricsForCustomResource metricsType, Resource res) { + update(metricsType, res, (v1, v2) -> v1 - v2); + } + + public void decreaseWithMultiplier(MetricsForCustomResource metricsType, + Resource res, int containers) { + update(metricsType, res, (v1, v2) -> v1 - v2 * containers); + } + + public void set(MetricsForCustomResource metricsType, Resource res) { + update(metricsType, res, (v1, v2) -> v2); + } + + private void update(MetricsForCustomResource metricsType, + Resource res, BiFunction operation) { + if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { + ResourceInformation[] resources = res.getResources(); + + for (int i = 2; i < resources.length; i++) { + ResourceInformation resource = resources[i]; + Map metrics = getMetricsByType(metricsType); + + // Map.merge only applies operation if there is a value for the key in + // the map + if (!metrics.containsKey(resource.getName())) { + metrics.put(resource.getName(), 0L); + } + metrics.merge(resource.getName(), + resource.getValue(), operation); + } + } + } + + boolean isThereAnyAllocatedResource() { + return allocatedCustomResources.size() > 0; + } + + public Map getAllocatedCustomResources() { + return allocatedCustomResources; + } + + public float getMaxAllocationUtilization(Resource clusterResource) { + float maxUtilization = 0; + if (!allocatedCustomResources.isEmpty()) { + for (Map.Entry customRes : allocatedCustomResources + .entrySet()) { + long clusterResourceUtilization = + clusterResource.getResourceValue(customRes.getKey()); + if (clusterResourceUtilization != 0) { + float utilization = + (float) customRes.getValue() / clusterResourceUtilization; + if (utilization > maxUtilization) { + maxUtilization = utilization; + } + } + } + } + return maxUtilization; + } + + @VisibleForTesting + Map getMetricsByType( + MetricsForCustomResource metricsType) { + switch(metricsType) { + case AVAILABLE: + return availableCustomResources; + case ALLOCATED: + return allocatedCustomResources; + case PENDING: + return pendingCustomResources; + case RESERVED: + return reservedCustomResources; + case AGGREGATE_PREEMPTED_SECONDS: + return aggregatePreemptedSecondsForCustomResources; + default: + throw new IllegalStateException("No map found for metrics type: " + + metricsType); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 4b274df956f..4a545562a40 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2104,7 +2104,8 @@ protected void completedContainerInternal( private void updateQueuePreemptionMetrics( CSQueue queue, RMContainer rmc) { QueueMetrics qMetrics = queue.getMetrics(); - long usedMillis = rmc.getFinishTime() - rmc.getCreationTime(); + final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime(); + final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND; Resource containerResource = rmc.getAllocatedResource(); qMetrics.preemptContainer(); long mbSeconds = (containerResource.getMemorySize() * usedMillis) @@ -2113,6 +2114,8 @@ private void updateQueuePreemptionMetrics( / DateUtils.MILLIS_PER_SECOND; qMetrics.updatePreemptedMemoryMBSeconds(mbSeconds); qMetrics.updatePreemptedVcoreSeconds(vcSeconds); + qMetrics.updatePreemptedSecondsForCustomResources(containerResource, + usedSeconds); } @Lock(Lock.NoLock.class) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 43a47ae65fe..f34a8b3ca16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1185,11 +1185,19 @@ private void updateRootQueueMetrics() { */ private boolean shouldAttemptPreemption() { if (context.isPreemptionEnabled()) { - return (context.getPreemptionUtilizationThreshold() < Math.max( + float maxUtilizationOfCustomResources = rootMetrics + .getMaxUtilizationOfCustomResources(getClusterResource()); + + float maxUtilizationOfNormalResources = Math.max( (float) rootMetrics.getAllocatedMB() / getClusterResource().getMemorySize(), (float) rootMetrics.getAllocatedVirtualCores() / - getClusterResource().getVirtualCores())); + getClusterResource().getVirtualCores()); + + float utilization = Math.max(maxUtilizationOfNormalResources, + maxUtilizationOfCustomResources); + + return (context.getPreemptionUtilizationThreshold() < utilization); } return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestcase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestcase.java new file mode 100644 index 00000000000..a77ad5f27ec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestcase.java @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .QueueMetricsForCustomResources.MetricsForCustomResource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .TestQueueMetrics.MultiQueueSetup; + +import java.util.Map; +import java.util.function.Function; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.*; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .TestQueueMetricsForCustomResources.assertCustomResourceValue; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .TestQueueMetricsForCustomResources.assertCustomResourceValueInternal; + +public class QueueMetricsTestcase { + + private enum MetricsAssertType { + ALL, PARENT_QUEUE_ONLY + } + + public static final class Builder { + private MetricsSystem ms; + private Map customResourceValues = Maps.newHashMap(); + private int containers; + private int containersToDecrease; + private int vCores; + private int vCoresToDecrease; + private long customResToDecrease; + private String parentQueueName; + private String leafQueueName; + private String user; + private String partition; + private long memoryMB; + private long memoryMBToDecrease; + + private Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder withContainers(int containers) { + this.containers = containers; + return this; + } + + public Builder withContainersToDecrease(int containersToDecrease) { + this.containersToDecrease = containersToDecrease; + return this; + } + + public Builder withVCores(int vCores) { + this.vCores = vCores; + return this; + } + + public Builder withVCoresToDecrease(int vCoresToDecrease) { + this.vCoresToDecrease = vCoresToDecrease; + return this; + } + + public Builder withMemoryMB(long memoryMB) { + this.memoryMB = memoryMB; + return this; + } + + public Builder withMemoryMBToDecrease(long memoryMBToDecrease) { + this.memoryMBToDecrease = memoryMBToDecrease; + return this; + } + + public Builder withCustomResourceValue(String name, Long value) { + customResourceValues.put(name, value); + return this; + } + + public Builder withCustomResToDecrease(long customResToDecrease) { + this.customResToDecrease = customResToDecrease; + return this; + } + + public Builder withParentQueueName(String parentQueueName) { + this.parentQueueName = parentQueueName; + return this; + } + + public Builder withLeafQueueName(String leafQueueName) { + this.leafQueueName = leafQueueName; + return this; + } + + public Builder withUser(String user) { + this.user = user; + return this; + } + + public Builder withPartition(String partition) { + this.partition = partition; + return this; + } + + public QueueMetricsTestcase build() { + return new QueueMetricsTestcase(ms, leafQueueName, parentQueueName, user, + partition, customResourceValues, containers, containersToDecrease, + vCores, vCoresToDecrease, memoryMB, memoryMBToDecrease, + customResToDecrease); + } + + public Builder withMetricSystem(MetricsSystem ms) { + this.ms = ms; + return this; + } + } + + private static final String CUSTOM_RES_1 = "custom_res_1"; + private static final String CUSTOM_RES_2 = "custom_res_2"; + private static final ResourceMetricsChecker INITIAL_RESOURCE_METRICS_CHECKER + = ResourceMetricsChecker.createInitial(); + + + private final MetricsSystem ms; + private final String leafQueueName; + private final String parentQueueName; + private final Map customResourceValues; + private final int containers; + private final int containersToDecrease; + private final int vCores; + private final int vCoresToDecrease; + private final long memoryMB; + private final long memoryMBToDecrease; + private final long customResToDecrease; + private final String user; + private final String partition; + + + private QueueMetricsTestcase(MetricsSystem ms, String leafQueueName, String + parentQueueName, String user, String partition, Map + customResourceValues, int containers, int containersToDecrease, int + vCores, int vCoresToDecrease, long memoryMB, long memoryMBToDecrease, + long customResToDecrease) { + this.ms = ms; + this.leafQueueName = leafQueueName; + this.parentQueueName = parentQueueName; + this.customResourceValues = customResourceValues; + this.containers = containers; + this.containersToDecrease = containersToDecrease; + this.vCores = vCores; + this.vCoresToDecrease = vCoresToDecrease; + this.memoryMB = memoryMB; + this.memoryMBToDecrease = memoryMBToDecrease; + this.customResToDecrease = customResToDecrease; + this.user = user; + this.partition = partition; + } + + void testIncreasePendingResources() { + testIncreasePendingResourcesInternal(true); + } + + void testIncreasePendingResourcesWithoutContainer() { + testIncreasePendingResourcesInternal(false); + } + + private void testIncreasePendingResourcesInternal(boolean useContainers) { + final int containers; + if (useContainers) { + containers = this.containers; + } else { + containers = 1; + } + + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); + mqs.leafMetrics.incrPendingResources(partition, mqs.user, containers, + createResource()); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + mqs.leafQueueSource) + .gaugeInt(PENDING_CONTAINERS, containers) + .gaugeLong(PENDING_MB, containers * memoryMB) + .gaugeInt(PENDING_V_CORES, containers * vCores) + .checkAll(); + assertMetrics(mqs, checker, MetricsForCustomResource.PENDING, + getModifiedCustomResourceValues(v -> v * containers)); + } + + public void testDecreasePendingResources() { + final int containersAfterDecrease = containers - containersToDecrease; + final int vcoresAfterDecrease = + (vCores * containers) - (vCoresToDecrease * containersToDecrease); + final long memoryAfterDecrease = (memoryMB * containers) + - (memoryMBToDecrease * containersToDecrease); + + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); + mqs.leafMetrics.decrPendingResources(partition, user, containersToDecrease, + ResourceTypesTestHelper.newResource(memoryMBToDecrease, vCoresToDecrease, + ImmutableMap. builder() + .put(CUSTOM_RES_1, String.valueOf(customResToDecrease)) + .put(CUSTOM_RES_2, String.valueOf(customResToDecrease)) + .build())); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + mqs.leafQueueSource) + .gaugeInt(PENDING_CONTAINERS, containersAfterDecrease) + .gaugeLong(PENDING_MB, memoryAfterDecrease) + .gaugeInt(PENDING_V_CORES, vcoresAfterDecrease) + .checkAll(); + + assertMetrics(mqs, checker, MetricsForCustomResource.PENDING, + getModifiedCustomResourceValues(v -> v * containers + - (customResToDecrease * containersToDecrease))); + } + + public void testAllocateResources(boolean decreasePending) { + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); + mqs.leafMetrics.allocateResources(partition, mqs.user, containers, + createResource(), decreasePending); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + mqs.leafQueueSource) + .gaugeInt(ALLOCATED_CONTAINERS, containers) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, containers) + .gaugeLong(ALLOCATED_MB, containers * memoryMB) + .gaugeInt(ALLOCATED_V_CORES, containers * vCores) + .gaugeInt(PENDING_CONTAINERS, 0) + .gaugeLong(PENDING_MB, 0) + .gaugeInt(PENDING_V_CORES, 0) + .checkAll(); + if (decreasePending) { + assertMetrics(mqs, checker, MetricsForCustomResource.PENDING, + getModifiedCustomResourceValues(v -> 0L)); + } + if (!customResourceValues.isEmpty()) { + assertMetrics(mqs, checker, MetricsForCustomResource.ALLOCATED, + getModifiedCustomResourceValues(v -> v * containers)); + } + } + + public void testAllocateResourcesWithoutContainer() { + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); + mqs.leafMetrics.allocateResources(partition, mqs.user, + createResource()); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + mqs.leafQueueSource) + .gaugeLong(ALLOCATED_MB, memoryMB) + .gaugeInt(ALLOCATED_V_CORES, vCores) + .gaugeInt(PENDING_CONTAINERS, 1) + .gaugeLong(PENDING_MB, 0) + .gaugeInt(PENDING_V_CORES, 0) + .checkAll(); + assertMetrics(mqs, checker, MetricsForCustomResource.PENDING, + getModifiedCustomResourceValues(v -> 0L)); + assertMetrics(mqs, checker, MetricsForCustomResource.ALLOCATED, + getModifiedCustomResourceValues(v -> v)); + } + + public void testReleaseResourcesWithContainers() { + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); + mqs.leafMetrics.releaseResources(partition, mqs.user, containers, + createResource()); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + mqs.leafQueueSource) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, containers) + .counter(AGGREGATE_CONTAINERS_RELEASED, containers) + .checkAll(); + assertMetrics(mqs, checker, MetricsForCustomResource.ALLOCATED, + getModifiedCustomResourceValues(v -> 0L)); + } + + public void testUpdatePreemptedSeconds(int seconds) { + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); + mqs.leafMetrics.updatePreemptedMemoryMBSeconds(memoryMB * seconds); + mqs.leafMetrics.updatePreemptedVcoreSeconds(vCores * seconds); + mqs.leafMetrics.updatePreemptedSecondsForCustomResources( + createResource(), seconds); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + mqs.leafQueueSource) + .counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED, memoryMB * seconds) + .counter(AGGREGATE_VCORE_SECONDS_PREEMPTED, vCores * seconds) + .checkAll(); + assertMetricsParentQueueOnly(mqs, checker, + MetricsForCustomResource.AGGREGATE_PREEMPTED_SECONDS, + getModifiedCustomResourceValues(v -> v * seconds)); + } + + public void testReserveResources() { + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); + mqs.leafMetrics.reserveResource(partition, mqs.user, createResource()); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + mqs.leafQueueSource) + .gaugeInt(RESERVED_CONTAINERS, 1) + .gaugeLong(RESERVED_MB, memoryMB) + .gaugeInt(RESERVED_V_CORES, vCores) + .checkAll(); + assertMetrics(mqs, checker, MetricsForCustomResource.RESERVED, + getModifiedCustomResourceValues(v -> v)); + } + + public void testUnreserveResources() { + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); + mqs.leafMetrics.unreserveResource(partition, mqs.user, + createResource()); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + mqs.leafQueueSource) + .gaugeInt(RESERVED_CONTAINERS, 0) + .gaugeLong(RESERVED_MB, 0) + .gaugeInt(RESERVED_V_CORES, 0) + .checkAll(); + assertMetrics(mqs, checker, MetricsForCustomResource.RESERVED, + getModifiedCustomResourceValues(v -> 0L)); + } + + public void testGetAllocatedResources() { + testAllocateResources(false); + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); + Resource res = mqs.leafMetrics.getAllocatedResources(); + if (customResourceValues.size() > 0) { + assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED, + CUSTOM_RES_1, customResourceValues.get(CUSTOM_RES_1) * containers, + res.getResourceValue(CUSTOM_RES_1)); + assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED, + CUSTOM_RES_2, customResourceValues.get(CUSTOM_RES_2) * containers, + res.getResourceValue(CUSTOM_RES_2)); + } + } + + private void assertMetrics(MultiQueueSetup mqs, + ResourceMetricsChecker checker, + MetricsForCustomResource metricsType, + Map expectedCustomResourceValues) { + assertMetricsInternal(mqs, checker, metricsType, MetricsAssertType.ALL, + expectedCustomResourceValues); + } + + private void assertMetricsParentQueueOnly(MultiQueueSetup mqs, + ResourceMetricsChecker checker, + MetricsForCustomResource metricsType, + Map expectedCustomResourceValues) { + assertMetricsInternal(mqs, checker, metricsType, + MetricsAssertType.PARENT_QUEUE_ONLY, expectedCustomResourceValues); + } + + private void assertMetricsInternal(MultiQueueSetup mqs, + ResourceMetricsChecker checker, MetricsForCustomResource metricsType, + MetricsAssertType metricsAssertType, + Map expectedCustomResourceValues) { + checker = ResourceMetricsChecker + .createFromChecker(checker, mqs.parentQueueSource).checkAll(); + + if (metricsAssertType == MetricsAssertType.ALL) { + checker = ResourceMetricsChecker + .createFromChecker(checker, mqs.userSource).checkAll(); + ResourceMetricsChecker.createFromChecker(checker, mqs.parentUserSource) + .checkAll(); + } + + assertCustomResourceValue(mqs.parentMetrics, metricsType, CUSTOM_RES_1, + expectedCustomResourceValues.get(CUSTOM_RES_1)); + assertCustomResourceValue(mqs.parentMetrics, metricsType, CUSTOM_RES_2, + expectedCustomResourceValues.get(CUSTOM_RES_2)); + assertCustomResourceValue(mqs.leafMetrics, metricsType, CUSTOM_RES_1, + expectedCustomResourceValues.get(CUSTOM_RES_1)); + assertCustomResourceValue(mqs.leafMetrics, metricsType, CUSTOM_RES_2, + expectedCustomResourceValues.get(CUSTOM_RES_2)); + } + + private Map getModifiedCustomResourceValues( + Function func) { + Map modifiedValues = Maps.newHashMap(); + for (Map.Entry res : customResourceValues + .entrySet()) { + modifiedValues.put(res.getKey(), func.apply(res.getValue())); + } + return modifiedValues; + } + + private Resource createResource() { + if (!customResourceValues.isEmpty()) { + return ResourceTypesTestHelper.newResource(memoryMB, vCores, + ImmutableMap. builder() + .put(CUSTOM_RES_1, + String.valueOf(customResourceValues.get(CUSTOM_RES_1))) + .put(CUSTOM_RES_2, + String.valueOf(customResourceValues.get(CUSTOM_RES_2))) + .build()); + } + return ResourceTypesTestHelper.newResource(memoryMB, vCores, + Maps.newHashMap()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java index 9e87cd5b5d4..c6fea9b8a2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java @@ -19,7 +19,6 @@ import com.google.common.collect.Maps; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.hadoop.test.MetricsAsserts; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,58 +27,64 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.COUNTER_LONG; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_INT; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_LONG; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; class ResourceMetricsChecker { final static Logger LOG = LoggerFactory.getLogger(ResourceMetricsChecker.class); + enum ResourceMetricType { + GAUGE_INT, GAUGE_LONG, COUNTER_INT, COUNTER_LONG + } + enum ResourceMetricsKey { - ALLOCATED_MB("AllocatedMB"), - ALLOCATED_V_CORES("AllocatedVCores"), - ALLOCATED_CONTAINERS("AllocatedContainers"), - AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"), - AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"), - AVAILABLE_MB("AvailableMB"), - AVAILABLE_V_CORES("AvailableVCores"), - PENDING_MB("PendingMB"), - PENDING_V_CORES("PendingVCores"), - PENDING_CONTAINERS("PendingContainers"), - RESERVED_MB("ReservedMB"), - RESERVED_V_CORES("ReservedVCores"), - RESERVED_CONTAINERS("ReservedContainers"); + ALLOCATED_MB("AllocatedMB", GAUGE_LONG), + ALLOCATED_V_CORES("AllocatedVCores", GAUGE_INT), + ALLOCATED_CONTAINERS("AllocatedContainers", GAUGE_INT), + AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated", COUNTER_LONG), + AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased", COUNTER_LONG), + AVAILABLE_MB("AvailableMB", GAUGE_LONG), + AVAILABLE_V_CORES("AvailableVCores", GAUGE_INT), + PENDING_MB("PendingMB", GAUGE_LONG), + PENDING_V_CORES("PendingVCores", GAUGE_INT), + PENDING_CONTAINERS("PendingContainers", GAUGE_INT), + RESERVED_MB("ReservedMB", GAUGE_LONG), + RESERVED_V_CORES("ReservedVCores", GAUGE_INT), + RESERVED_CONTAINERS("ReservedContainers", GAUGE_INT), + AGGREGATE_VCORE_SECONDS_PREEMPTED("AggregateVcoreSecondsPreempted", + COUNTER_LONG), AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED( + "AggregateMemoryMBSecondsPreempted", COUNTER_LONG); private String value; - - ResourceMetricsKey(String value) { + private ResourceMetricType type; + + ResourceMetricsKey(String value, ResourceMetricType type) { this.value = value; + this.type = type; } public String getValue() { return value; } + + public ResourceMetricType getType() { + return type; + } } private MetricsRecordBuilder recordBuilder; @@ -138,19 +143,30 @@ public static ResourceMetricsChecker createInitial() { } ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) { + ensureTypeIsCorrect(key, GAUGE_LONG); gaugesLong.put(key, value); return this; } ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) { + ensureTypeIsCorrect(key, GAUGE_INT); gaugesInt.put(key, value); return this; } ResourceMetricsChecker counter(ResourceMetricsKey key, long value) { + ensureTypeIsCorrect(key, COUNTER_LONG); counters.put(key, value); return this; } + + private void ensureTypeIsCorrect(ResourceMetricsKey + key, ResourceMetricType actualType) { + if (key.type != actualType) { + throw new IllegalStateException("Metrics type should be " + key.type + + " instead of " + actualType + " for metrics: " + key.value); + } + } ResourceMetricsChecker checkAll() { if (recordBuilder == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index 76b8090b5b1..9b57ae21c26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -47,6 +47,37 @@ import org.junit.Test; public class TestQueueMetrics { + static class MultiQueueSetup { + final MetricsSource parentQueueSource; + final MetricsSource leafQueueSource; + final MetricsSource userSource; + final MetricsSource parentUserSource; + final QueueMetrics parentMetrics; + final QueueMetrics leafMetrics; + final String parentQueueName; + final String leafQueueName; + final String user; + + MultiQueueSetup(MetricsSystem ms, String parentQueueName, + String leafQueueName, String user) { + this.parentQueueName = parentQueueName; + this.leafQueueName = leafQueueName; + this.user = user; + + parentMetrics = + QueueMetrics.forQueue(ms, parentQueueName, null, true, conf); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + leafMetrics = + QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf); + parentQueueSource = queueSource(ms, parentQueueName); + leafQueueSource = queueSource(ms, leafQueueName); + + leafMetrics.submitApp(user); + userSource = userSource(ms, leafQueueName, user); + parentUserSource = userSource(ms, parentQueueName, user); + } + } private static final ResourceMetricsChecker INITIAL_RESOURCE_METRICS_CHECKER = ResourceMetricsChecker.createInitial(); @@ -111,7 +142,7 @@ public void testDefaultSingleQueueMetrics() { .gaugeLong(ALLOCATED_MB, 6 * GB) .gaugeInt(ALLOCATED_V_CORES, 6) .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) + .gaugeLong(AGGREGATE_CONTAINERS_ALLOCATED, 3) .gaugeLong(PENDING_MB, 9 * GB) .gaugeInt(PENDING_V_CORES, 9) .gaugeInt(PENDING_CONTAINERS, 2) @@ -314,7 +345,7 @@ public void testSingleQueueWithUserMetrics() { .gaugeLong(ALLOCATED_MB, 6 * GB) .gaugeInt(ALLOCATED_V_CORES, 6) .gaugeInt(ALLOCATED_CONTAINERS, 3) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) + .gaugeLong(AGGREGATE_CONTAINERS_ALLOCATED, 3) .gaugeLong(PENDING_MB, 9 * GB) .gaugeInt(PENDING_V_CORES, 9) .gaugeInt(PENDING_CONTAINERS, 2) @@ -408,63 +439,53 @@ public void testTwoLevelWithUserMetrics() { String parentQueueName = "root"; String leafQueueName = "root.leaf"; String user = "alice"; - - QueueMetrics parentMetrics = - QueueMetrics.forQueue(ms, parentQueueName, null, true, conf); - Queue parentQueue = mock(Queue.class); - when(parentQueue.getMetrics()).thenReturn(parentMetrics); - QueueMetrics metrics = - QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf); - MetricsSource parentQueueSource = queueSource(ms, parentQueueName); - MetricsSource queueSource = queueSource(ms, leafQueueName); AppSchedulingInfo app = mockApp(user); - metrics.submitApp(user); - MetricsSource userSource = userSource(ms, leafQueueName, user); - MetricsSource parentUserSource = userSource(ms, parentQueueName, user); + MultiQueueSetup mqs = new MultiQueueSetup(ms, + parentQueueName, leafQueueName, user); AppMetricsChecker appMetricsChecker = AppMetricsChecker - .createFromChecker(INITIAL_APP_METRICS_CHECKER, queueSource, true) + .createFromChecker(INITIAL_APP_METRICS_CHECKER, mqs.leafQueueSource, true) .counter(APPS_SUBMITTED, 1) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, parentQueueSource, true) + .createFromChecker(appMetricsChecker, mqs.parentQueueSource, true) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, userSource, true) + .createFromChecker(appMetricsChecker, mqs.userSource, true) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, parentUserSource, true) + .createFromChecker(appMetricsChecker, mqs.parentUserSource, true) .checkAll(); - metrics.submitAppAttempt(user); + mqs.leafMetrics.submitAppAttempt(user); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, queueSource, true) + .createFromChecker(appMetricsChecker, mqs.leafQueueSource, true) .gaugeInt(APPS_PENDING, 1) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, parentQueueSource, true) + .createFromChecker(appMetricsChecker, mqs.parentQueueSource, true) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, userSource, true) + .createFromChecker(appMetricsChecker, mqs.userSource, true) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, parentUserSource, true) + .createFromChecker(appMetricsChecker, mqs.parentUserSource, true) .checkAll(); - parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + mqs.parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, Resources.createResource(100*GB, 100)); - metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + mqs.leafMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, Resources.createResource(100*GB, 100)); - parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, + mqs.parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, user, Resources.createResource(10*GB, 10)); - metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, + mqs.leafMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, user, Resources.createResource(10*GB, 10)); - metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, + mqs.leafMetrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, user, 5, Resources.createResource(3*GB, 3)); ResourceMetricsChecker rmChecker = ResourceMetricsChecker - .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, queueSource) + .createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, mqs.leafQueueSource) .gaugeLong(AVAILABLE_MB, 100 * GB) .gaugeInt(AVAILABLE_V_CORES, 100) .gaugeLong(PENDING_MB, 15 * GB) @@ -472,35 +493,35 @@ public void testTwoLevelWithUserMetrics() { .gaugeInt(PENDING_CONTAINERS, 5) .checkAll(); rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, parentQueueSource) + .createFromChecker(rmChecker, mqs.parentQueueSource) .checkAll(); rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, userSource) + .createFromChecker(rmChecker, mqs.userSource) .gaugeLong(AVAILABLE_MB, 10 * GB) .gaugeInt(AVAILABLE_V_CORES, 10) .checkAll(); rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, parentUserSource) + .createFromChecker(rmChecker, mqs.parentUserSource) .checkAll(); - metrics.runAppAttempt(app.getApplicationId(), user); + mqs.leafMetrics.runAppAttempt(app.getApplicationId(), user); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, queueSource, true) + .createFromChecker(appMetricsChecker, mqs.leafQueueSource, true) .gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_RUNNING, 1) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, userSource, true) + .createFromChecker(appMetricsChecker, mqs.userSource, true) .checkAll(); - metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, + mqs.leafMetrics.allocateResources(RMNodeLabelsManager.NO_LABEL, user, 3, Resources.createResource(2*GB, 2), true); - metrics.reserveResource(RMNodeLabelsManager.NO_LABEL, + mqs.leafMetrics.reserveResource(RMNodeLabelsManager.NO_LABEL, user, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, queueSource) + .createFromChecker(rmChecker, mqs.leafQueueSource) .gaugeLong(AVAILABLE_MB, 100 * GB) .gaugeInt(AVAILABLE_V_CORES, 100) .gaugeLong(ALLOCATED_MB, 6 * GB) @@ -515,23 +536,23 @@ public void testTwoLevelWithUserMetrics() { .gaugeInt(RESERVED_CONTAINERS, 1) .checkAll(); rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, queueSource) + .createFromChecker(rmChecker, mqs.leafQueueSource) .checkAll(); rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, userSource) + .createFromChecker(rmChecker, mqs.userSource) .gaugeLong(AVAILABLE_MB, 10 * GB) .gaugeInt(AVAILABLE_V_CORES, 10) .checkAll(); rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, parentUserSource) + .createFromChecker(rmChecker, mqs.parentUserSource) .checkAll(); - metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, + mqs.leafMetrics.releaseResources(RMNodeLabelsManager.NO_LABEL, user, 1, Resources.createResource(2*GB, 2)); - metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL, + mqs.leafMetrics.unreserveResource(RMNodeLabelsManager.NO_LABEL, user, Resources.createResource(3*GB, 3)); rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, queueSource) + .createFromChecker(rmChecker, mqs.leafQueueSource) .gaugeLong(AVAILABLE_MB, 100 * GB) .gaugeInt(AVAILABLE_V_CORES, 100) .gaugeLong(ALLOCATED_MB, 4 * GB) @@ -543,46 +564,46 @@ public void testTwoLevelWithUserMetrics() { .gaugeInt(RESERVED_CONTAINERS, 0) .checkAll(); rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, parentQueueSource) + .createFromChecker(rmChecker, mqs.parentQueueSource) .checkAll(); rmChecker = ResourceMetricsChecker - .createFromChecker(rmChecker, userSource) + .createFromChecker(rmChecker, mqs.userSource) .gaugeLong(AVAILABLE_MB, 10 * GB) .gaugeInt(AVAILABLE_V_CORES, 10) .checkAll(); ResourceMetricsChecker - .createFromChecker(rmChecker, parentUserSource) + .createFromChecker(rmChecker, mqs.parentUserSource) .checkAll(); - metrics.finishAppAttempt( + mqs.leafMetrics.finishAppAttempt( app.getApplicationId(), app.isPending(), app.getUser()); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, queueSource, true) + .createFromChecker(appMetricsChecker, mqs.leafQueueSource, true) .gaugeInt(APPS_RUNNING, 0) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, parentQueueSource, true) + .createFromChecker(appMetricsChecker, mqs.parentQueueSource, true) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, userSource, true) + .createFromChecker(appMetricsChecker, mqs.userSource, true) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, parentUserSource, true) + .createFromChecker(appMetricsChecker, mqs.parentUserSource, true) .checkAll(); - metrics.finishApp(user, RMAppState.FINISHED); + mqs.leafMetrics.finishApp(user, RMAppState.FINISHED); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, queueSource, true) + .createFromChecker(appMetricsChecker, mqs.leafQueueSource, true) .counter(APPS_COMPLETED, 1) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, parentQueueSource, true) + .createFromChecker(appMetricsChecker, mqs.parentQueueSource, true) .checkAll(); appMetricsChecker = AppMetricsChecker - .createFromChecker(appMetricsChecker, userSource, true) + .createFromChecker(appMetricsChecker, mqs.userSource, true) .checkAll(); AppMetricsChecker - .createFromChecker(appMetricsChecker, parentUserSource, true) + .createFromChecker(appMetricsChecker, mqs.parentUserSource, true) .checkAll(); } @@ -664,7 +685,7 @@ private static void checkAggregatedNodeTypes(MetricsSource source, assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb); } - private static AppSchedulingInfo mockApp(String user) { + static AppSchedulingInfo mockApp(String user) { AppSchedulingInfo app = mock(AppSchedulingInfo.class); when(app.getUser()).thenReturn(user); ApplicationId appId = BuilderUtils.newApplicationId(1, 1); @@ -677,7 +698,7 @@ public static MetricsSource queueSource(MetricsSystem ms, String queue) { return ms.getSource(QueueMetrics.sourceName(queue).toString()); } - private static MetricsSource userSource(MetricsSystem ms, String queue, + static MetricsSource userSource(MetricsSystem ms, String queue, String user) { return ms.getSource(QueueMetrics.sourceName(queue). append(",user=").append(user).toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java new file mode 100644 index 00000000000..2ee738cb564 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetricsForCustomResources.MetricsForCustomResource; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .TestQueueMetrics.MultiQueueSetup; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestQueueMetricsForCustomResources { + private static final ResourceMetricsChecker INITIAL_RESOURCE_METRICS_CHECKER + = ResourceMetricsChecker.createInitial(); + + public static final long GB = 1024; // MB + private static final Configuration conf = new Configuration(); + private static final String CUSTOM_RES_1 = "custom_res_1"; + private static final String CUSTOM_RES_2 = "custom_res_2"; + private static final String ROOT_QUEUE_NAME = "root"; + private static final String LEAF_QUEUE_NAME = "root.leaf"; + public static final String USER = "alice"; + + private MetricsSystem ms; + + @Before + public void setUp() { + ms = new MetricsSystemImpl(); + QueueMetrics.clearQueueMetrics(); + initializeResourceTypes(); + } + + private void initializeResourceTypes() { + Map riMap = new HashMap<>(); + + ResourceInformation memory = ResourceInformation.newInstance( + ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = ResourceInformation.newInstance( + ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES_1, + ResourceInformation.VCORES.getUnits(), 0, 2000); + ResourceInformation res2 = ResourceInformation.newInstance(CUSTOM_RES_2, + ResourceInformation.VCORES.getUnits(), 0, 2000); + + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + riMap.put(CUSTOM_RES_1, res1); + riMap.put(CUSTOM_RES_2, res2); + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + } + + static void assertCustomResourceValue(QueueMetrics metrics, + MetricsForCustomResource metricsType, String resourceName, + long expectedValue) { + Map metricsValues = metrics.queueMetricsForCustomResources + .getMetricsByType(metricsType); + Long value = metricsValues.get(resourceName); + assertCustomResourceValueInternal(metricsType, resourceName, + expectedValue, value); + } + + static void assertCustomResourceValueInternal( + MetricsForCustomResource metricsType, String resourceName, long + expectedValue, Long value) { + assertNotNull( + "QueueMetrics should have custom resource metrics value for resource: " + + resourceName, value); + assertEquals(String.format( + "QueueMetrics should have custom resource metrics value %d " + + "for resource: %s for metrics type %s", + expectedValue, resourceName, metricsType), expectedValue, + (long) value); + } + + @Test + public void testSetAvailableResourcesToQueue1() { + String queueName = "single"; + QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, + false, conf); + MetricsSource queueSource = queueSource(ms, queueName); + + metrics.setAvailableResourcesToQueue(ResourceTypesTestHelper.newResource( + GB, 4, + ImmutableMap. builder() + .put(CUSTOM_RES_1, String.valueOf(5 * GB)) + .put(CUSTOM_RES_2, String.valueOf(6 * GB)) + .build())); + ResourceMetricsChecker.createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + queueSource) + .gaugeLong(AVAILABLE_MB, GB) + .gaugeInt(AVAILABLE_V_CORES, 4) + .checkAll(); + + assertCustomResourceValue(metrics, + MetricsForCustomResource.AVAILABLE, CUSTOM_RES_1, 5 * GB); + assertCustomResourceValue(metrics, + MetricsForCustomResource.AVAILABLE, CUSTOM_RES_2, 6 * GB); + } + + @Test + public void testSetAvailableResourcesToQueue2() { + String queueName = "single"; + QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, + false, conf); + MetricsSource queueSource = queueSource(ms, queueName); + + metrics.setAvailableResourcesToQueue(null, + ResourceTypesTestHelper.newResource(GB, 4, + ImmutableMap. builder() + .put(CUSTOM_RES_1, String.valueOf(15 * GB)) + .put(CUSTOM_RES_2, String.valueOf(20 * GB)) + .build())); + ResourceMetricsChecker.createFromChecker(INITIAL_RESOURCE_METRICS_CHECKER, + queueSource) + .gaugeLong(AVAILABLE_MB, GB) + .gaugeInt(AVAILABLE_V_CORES, 4) + .checkAll(); + + assertCustomResourceValue(metrics, + MetricsForCustomResource.AVAILABLE, CUSTOM_RES_1, 15 * GB); + assertCustomResourceValue(metrics, + MetricsForCustomResource.AVAILABLE, CUSTOM_RES_2, 20 * GB); + } + + @Test + public void testIncreasePendingResources() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withContainersToDecrease(2) + .withVCores(4) + .withVCoresToDecrease(2) + .withMemoryMB(4 * GB) + .withMemoryMBToDecrease(GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .withCustomResToDecrease(2 * GB) + .build(); + + testcase.testIncreasePendingResources(); + } + + @Test + public void testDecreasePendingResources() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withContainersToDecrease(2) + .withVCores(4) + .withVCoresToDecrease(2) + .withMemoryMB(4 * GB) + .withMemoryMBToDecrease(GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .withCustomResToDecrease(2 * GB) + .build(); + + testcase.testIncreasePendingResources(); + testcase.testDecreasePendingResources(); + } + + @Test + public void testAllocateResourcesWithoutDecreasePending() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withVCores(4) + .withMemoryMB(4 * GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .build(); + + testcase.testAllocateResources(false); + } + + @Test + public void testAllocateResourcesWithDecreasePending() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withContainersToDecrease(2) + .withVCores(4) + .withVCoresToDecrease(2) + .withMemoryMB(4 * GB) + .withMemoryMBToDecrease(GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .withCustomResToDecrease(2 * GB) + .build(); + + //first, increase pending resources + testcase.testIncreasePendingResources(); + + //then allocate with decrease pending resources + testcase.testAllocateResources(true); + } + + @Test + public void testAllocateResourcesWithoutContainer() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withVCores(4) + .withMemoryMB(4 * GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .build(); + + //first, increase pending resources + testcase.testIncreasePendingResourcesWithoutContainer(); + + testcase.testAllocateResourcesWithoutContainer(); + } + + @Test + public void testReleaseResources() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withContainersToDecrease(2) + .withVCores(4) + .withVCoresToDecrease(2) + .withMemoryMB(4 * GB) + .withMemoryMBToDecrease(GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .withCustomResToDecrease(2 * GB) + .build(); + + //first, allocate some resources + testcase.testAllocateResources(false); + + testcase.testReleaseResourcesWithContainers(); + } + + @Test + public void testUpdatePreemptedSecondsForCustomResources() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withVCores(4) + .withMemoryMB(4 * GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .build(); + + final int seconds = 1; + testcase.testUpdatePreemptedSeconds(seconds); + } + + @Test + public void testUpdatePreemptedSecondsForCustomResources2() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withVCores(4) + .withMemoryMB(4 * GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .build(); + + final int seconds = 15; + testcase.testUpdatePreemptedSeconds(seconds); + } + + @Test + public void testReserveResources() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withVCores(4) + .withMemoryMB(4 * GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .build(); + + testcase.testReserveResources(); + } + + @Test + public void testUnreserveResources() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withVCores(4) + .withMemoryMB(4 * GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .build(); + + testcase.testReserveResources(); + testcase.testUnreserveResources(); + } + + @Test + public void testGetAllocatedResourcesWithCustomResources() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withVCores(4) + .withMemoryMB(4 * GB) + .withCustomResourceValue(CUSTOM_RES_1, 15 * GB) + .withCustomResourceValue(CUSTOM_RES_2, 20 * GB) + .build(); + + testcase.testGetAllocatedResources(); + } + + @Test + public void testGetAllocatedResourcesWithoutCustomResources() { + QueueMetricsTestcase testcase = QueueMetricsTestcase.Builder.create() + .withMetricSystem(ms) + .withParentQueueName(ROOT_QUEUE_NAME) + .withLeafQueueName(LEAF_QUEUE_NAME) + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL) + .withContainers(5) + .withVCores(4) + .withMemoryMB(4 * GB) + .build(); + + testcase.testGetAllocatedResources(); + } + + @Test + public void testGetMaxUtilizationOfCustomResourcesWithoutAllocatedCustomResource() { + MultiQueueSetup mqs = new MultiQueueSetup(ms, + ROOT_QUEUE_NAME, LEAF_QUEUE_NAME, USER); + Resource clusterResource = ResourceTypesTestHelper.newResource(4 * GB, 8, + ImmutableMap.builder() + .put(CUSTOM_RES_1, + String.valueOf(0L)) + .put(CUSTOM_RES_2, + String.valueOf(0L)) + .build()); + float utilization = mqs.leafMetrics + .getMaxUtilizationOfCustomResources(clusterResource); + assertEquals(0, utilization, 0.0); + } + + @Test + public void testGetMaxUtilizationOfCustomResourcesWithAllocatedCustomResource() { + MultiQueueSetup mqs = new MultiQueueSetup(ms, + ROOT_QUEUE_NAME, LEAF_QUEUE_NAME, USER); + + Resource allocatedResource = ResourceTypesTestHelper.newResource(4 * GB, 8, + ImmutableMap.builder() + .put(CUSTOM_RES_1, + String.valueOf(10L)) + .put(CUSTOM_RES_2, + String.valueOf(20L)) + .build()); + mqs.leafMetrics.allocateResources(RMNodeLabelsManager.NO_LABEL, mqs.user, 1, + allocatedResource, false); + + Resource clusterResource = ResourceTypesTestHelper.newResource(4 * GB, 8, + ImmutableMap.builder() + .put(CUSTOM_RES_1, + String.valueOf(0L)) + .put(CUSTOM_RES_2, + String.valueOf(0L)) + .build()); + float utilization = mqs.leafMetrics + .getMaxUtilizationOfCustomResources(clusterResource); + assertEquals(0, utilization, 0.0); + } + + @Test + public void testGetMaxUtilizationOfCustomResourcesWithAllocatedCustomResource2() { + MultiQueueSetup mqs = new MultiQueueSetup(ms, + ROOT_QUEUE_NAME, LEAF_QUEUE_NAME, USER); + + Resource allocatedResource = ResourceTypesTestHelper.newResource(4 * GB, 8, + ImmutableMap.builder() + .put(CUSTOM_RES_1, + String.valueOf(10L)) + .put(CUSTOM_RES_2, + String.valueOf(20L)) + .build()); + mqs.leafMetrics.allocateResources(RMNodeLabelsManager.NO_LABEL, mqs.user, 1, + allocatedResource, false); + + Resource clusterResource = ResourceTypesTestHelper.newResource(4 * GB, 8, + ImmutableMap.builder() + .put(CUSTOM_RES_1, + String.valueOf(5L)) + .put(CUSTOM_RES_2, + String.valueOf(5L)) + .build()); + float utilization = mqs.leafMetrics + .getMaxUtilizationOfCustomResources(clusterResource); + assertEquals(4.0, utilization, 0.0); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 3ac3849cf73..e65df268f94 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -60,6 +61,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; public class FairSchedulerTestBase { public final static String TEST_DIR = @@ -355,4 +357,15 @@ protected void addNode(int memory, int cores) { scheduler.handle(new NodeAddedSchedulerEvent(node)); rmNodes.add(node); } + + protected void addNode(int memory, int cores, + Map customResources) { + int id = rmNodes.size() + 1; + RMNode node = MockNodes.newNodeInfo(1, + ResourceTypesTestHelper.newResource(memory, cores, customResources), + id, "127.0.0." + id); + scheduler.handle(new NodeAddedSchedulerEvent(node)); + rmNodes.add(node); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index da6428a8b5a..7d7aee8acbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -17,21 +17,25 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.curator.shaded.com.google.common.base.Joiner; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.After; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -46,6 +50,13 @@ import java.util.Collection; import java.util.List; +import static org.apache.hadoop.yarn.util.resource.ResourceUtils.UNITS; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeThat; + /** * Tests to verify fairshare and minshare preemption, using parameterization. */ @@ -60,8 +71,12 @@ // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore) private static final int NODE_CAPACITY_MULTIPLE = 4; + private static final String CUSTOM_RES_1 = "custom_res_1"; + private static final String CUSTOM_RES_2 = "custom_res_2"; + private final boolean fairsharePreemption; private final boolean drf; + private final boolean useCustomResources; // App that takes up the entire cluster private FSAppAttempt greedyApp; @@ -69,20 +84,22 @@ // Starving app that is expected to instigate preemption private FSAppAttempt starvingApp; - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "{0} - {2}") public static Collection getParameters() { return Arrays.asList(new Object[][] { - {"MinSharePreemption", 0}, - {"MinSharePreemptionWithDRF", 1}, - {"FairSharePreemption", 2}, - {"FairSharePreemptionWithDRF", 3} + {"MinSharePreemption", 0, false}, + {"MinSharePreemptionWithDRF", 1, false}, + {"FairSharePreemption", 2, false}, + {"FairSharePreemptionWithDRF", 3, false}, + {"FairSharePreemptionWithDRF", 3, true}, }); } - public TestFairSchedulerPreemption(String name, int mode) - throws IOException { + public TestFairSchedulerPreemption(String name, int mode, + boolean useCustomResources) throws IOException { fairsharePreemption = (mode > 1); // 2 and 3 drf = (mode % 2 == 1); // 1 and 3 + this.useCustomResources = useCustomResources; writeAllocFile(); } @@ -94,9 +111,21 @@ public void setup() throws IOException { conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true); conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f); conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0); + if (useCustomResources) { + registerCustomResource(CUSTOM_RES_1); + registerCustomResource(CUSTOM_RES_2); + String resourceNames = Joiner.on(',').join(CUSTOM_RES_1, CUSTOM_RES_2); + conf.set(YarnConfiguration.RESOURCE_TYPES, resourceNames); + ResourceUtils.resetResourceTypes(conf); + } setupCluster(); } + private void registerCustomResource(String name) { + conf.set(YarnConfiguration.RESOURCE_TYPES + "." + name + UNITS, + "G"); + } + @After public void teardown() { ALLOC_FILE.delete(); @@ -185,7 +214,12 @@ private void writePreemptionParams(PrintWriter out) { private void writeResourceParams(PrintWriter out) { if (!fairsharePreemption) { - out.println("4096mb,4vcores"); + if (useCustomResources) { + out.println("memory-mb=4096,vcores=4," + + "custom_res_1=20,custom_res_2=40"); + } else { + out.println("4096mb,4vcores"); + } } } @@ -200,8 +234,18 @@ private void setupCluster() throws IOException { // Create and add two nodes to the cluster, with capacities // disproportional to the container requests. - addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); - addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + if (useCustomResources) { + ImmutableMap customResources = ImmutableMap + . builder().put(CUSTOM_RES_1, String.valueOf(40)) + .put(CUSTOM_RES_2, String.valueOf(80)).build(); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE, + customResources); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE, + customResources); + } else { + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + addNode(NODE_CAPACITY_MULTIPLE * GB, 3 * NODE_CAPACITY_MULTIPLE); + } // Reinitialize the scheduler so DRF policy picks up cluster capacity // TODO (YARN-6194): One shouldn't need to call this @@ -236,9 +280,38 @@ private void takeAllResources(String queueName) { ApplicationAttemptId appAttemptId = createSchedulingRequest(GB, 1, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size()); + takeResourcesInternal(queueName, appAttemptId); + } + + private void takeAllResourcesOfCustomResource(String queueName, + String customRes) { + // Create an app that takes up all the resources of a custom resource on the + // cluster + int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size(); + ResourceRequest resourceRequest = + createResourceRequest(0, 0, ResourceRequest.ANY, 1, numContainers, + true); + Resource capability = resourceRequest.getCapability(); + resourceRequest.setCapability( + ResourceTypesTestHelper.newResource(capability.getMemorySize(), + capability.getVirtualCores(), + ImmutableMap.builder() + .put(customRes, String.valueOf(10)) + .build())); + + ApplicationAttemptId appAttemptId = createSchedulingRequest( + Lists.newArrayList(resourceRequest), queueName, "default"); + takeResourcesInternal(queueName, appAttemptId); + } + + private void takeResourcesInternal(String queueName, + ApplicationAttemptId appAttemptId) { greedyApp = scheduler.getSchedulerApp(appAttemptId); scheduler.update(); sendEnoughNodeUpdatesToAssignFully(); + + //cluster has 8GB of memory / 24 vcores in total + //requested app has 1GB of memory / 1 vcores --> 8 containers assertEquals(8, greedyApp.getLiveContainers().size()); // Verify preemptable for queue and app attempt assertTrue( @@ -251,11 +324,8 @@ private void takeAllResources(String queueName) { * cluster. * * @param queueName queue name - * @throws InterruptedException - * if any thread has interrupted the current thread. */ - private void preemptHalfResources(String queueName) - throws InterruptedException { + private void preemptHalfResources(String queueName) { ApplicationAttemptId appAttemptId = createSchedulingRequest(2 * GB, 2, queueName, "default", NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); @@ -266,6 +336,29 @@ private void preemptHalfResources(String queueName) scheduler.update(); } + private void preemptHalfResourcesCustomResource(String queueName, + String customRes) { + int numContainers = NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2; + + // cannot request 0 memory as it will lead to division by zero when + // calculating Resource ratios + ResourceRequest resourceRequest = createResourceRequest(1, 0, + ResourceRequest.ANY, 1, numContainers, true); + Resource capability = resourceRequest.getCapability(); + resourceRequest.setCapability(ResourceTypesTestHelper.newResource( + capability.getMemorySize(), capability.getVirtualCores(), + ImmutableMap. builder() + .put(customRes, String.valueOf(20)).build())); + + ApplicationAttemptId appAttemptId = createSchedulingRequest( + Lists.newArrayList(resourceRequest), queueName, "default"); + starvingApp = scheduler.getSchedulerApp(appAttemptId); + + // Move clock enough to identify starvation + clock.tickSec(1); + scheduler.update(); + } + /** * Submit application to {@code queue1} and take over the entire cluster. * Submit application with larger containers to {@code queue2} that @@ -275,11 +368,16 @@ private void preemptHalfResources(String queueName) * @param queue2 second queue * @throws InterruptedException if interrupted while waiting */ - private void submitApps(String queue1, String queue2) - throws InterruptedException { + private void submitApps(String queue1, String queue2) { takeAllResources(queue1); preemptHalfResources(queue2); } + + private void submitAppsAndTakeAllOfCustomResource(String queue1, + String queue2, String customRes) { + takeAllResourcesOfCustomResource(queue1, customRes); + preemptHalfResourcesCustomResource(queue2, customRes); + } private void verifyPreemption(int numStarvedAppContainers, int numGreedyAppContainers) @@ -350,6 +448,21 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { } } + @Test + public void testPreemptionWithinSameLeafQueueCustomResource() + throws Exception { + assumeThat(useCustomResources, is(true)); + if (useCustomResources) { + String queue = "root.preemptable.child-1"; + submitAppsAndTakeAllOfCustomResource(queue, queue, CUSTOM_RES_1); + if (fairsharePreemption) { + verifyPreemption(2, 4); + } else { + verifyNoPreemption(); + } + } + } + @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { submitApps("root.preemptable.child-1", "root.preemptable.child-2");