diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java new file mode 100644 index 0000000..a0e6d8c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; + +public class QueueCapacities { + private static final String NL = CommonNodeLabelsManager.NO_LABEL; + private static final float LABEL_DOESNT_EXIST_CAP = 0f; + private Map capacitiesMap; + private ReadLock readLock; + private WriteLock writeLock; + + public QueueCapacities() { + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + readLock = lock.readLock(); + writeLock = lock.writeLock(); + + capacitiesMap = new HashMap(); + } + + // Usage enum here to make implement cleaner + private enum CapacityType { + USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5); + + private int idx; + + private CapacityType(int idx) { + this.idx = idx; + } + } + + private static class Capacities { + private float[] capacitiesArr; + + public Capacities() { + capacitiesArr = new float[CapacityType.values().length]; + } + } + + private float _get(String label, CapacityType type) { + try { + readLock.lock(); + Capacities cap = capacitiesMap.get(label); + if (null == cap) { + return LABEL_DOESNT_EXIST_CAP; + } + return cap.capacitiesArr[type.idx]; + } finally { + readLock.unlock(); + } + } + + private void _set(String label, CapacityType type, float value) { + try { + writeLock.lock(); + Capacities cap = capacitiesMap.get(label); + if (null == cap) { + cap = new Capacities(); + capacitiesMap.put(label, cap); + } + cap.capacitiesArr[type.idx] = value; + } finally { + writeLock.unlock(); + } + } + + /* Used Capacity Getter and Setter */ + public float getUsedCapacity() { + return _get(NL, CapacityType.USED_CAP); + } + + public float getUsedCapacity(String label) { + return _get(label, CapacityType.USED_CAP); + } + + public void setUsedCapacity(float value) { + _set(NL, CapacityType.USED_CAP, value); + } + + public void setUsedCapacity(String label, float value) { + _set(label, CapacityType.USED_CAP, value); + } + + /* Absolute Used Capacity Getter and Setter */ + public float getAbsoluteUsedCapacity() { + return _get(NL, CapacityType.ABS_USED_CAP); + } + + public float getAbsoluteUsedCapacity(String label) { + return _get(label, CapacityType.ABS_USED_CAP); + } + + public void setAbsoluteUsedCapacity(float value) { + _set(NL, CapacityType.ABS_USED_CAP, value); + } + + public void setAbsoluteUsedCapacity(String label, float value) { + _set(label, CapacityType.ABS_USED_CAP, value); + } + + /* Capacity Getter and Setter */ + public float getCapacity() { + return _get(NL, CapacityType.CAP); + } + + public float getCapacity(String label) { + return _get(label, CapacityType.CAP); + } + + public void setCapacity(float value) { + _set(NL, CapacityType.CAP, value); + } + + public void setCapacity(String label, float value) { + _set(label, CapacityType.CAP, value); + } + + /* Absolute Capacity Getter and Setter */ + public float getAbsoluteCapacity() { + return _get(NL, CapacityType.ABS_CAP); + } + + public float getAbsoluteCapacity(String label) { + return _get(label, CapacityType.ABS_CAP); + } + + public void setAbsoluteCapacity(float value) { + _set(NL, CapacityType.ABS_CAP, value); + } + + public void setAbsoluteCapacity(String label, float value) { + _set(label, CapacityType.ABS_CAP, value); + } + + /* Maximum Capacity Getter and Setter */ + public float getMaximumCapacity() { + return _get(NL, CapacityType.MAX_CAP); + } + + public float getMaximumCapacity(String label) { + return _get(label, CapacityType.MAX_CAP); + } + + public void setMaximumCapacity(float value) { + _set(NL, CapacityType.MAX_CAP, value); + } + + public void setMaximumCapacity(String label, float value) { + _set(label, CapacityType.MAX_CAP, value); + } + + /* Absolute Maximum Capacity Getter and Setter */ + public float getAbsoluteMaximumCapacity() { + return _get(NL, CapacityType.ABS_MAX_CAP); + } + + public float getAbsoluteMaximumCapacity(String label) { + return _get(label, CapacityType.ABS_MAX_CAP); + } + + public void setAbsoluteMaximumCapacity(float value) { + _set(NL, CapacityType.ABS_MAX_CAP, value); + } + + public void setAbsoluteMaximumCapacity(String label, float value) { + _set(label, CapacityType.ABS_MAX_CAP, value); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java new file mode 100644 index 0000000..6d2a421 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Resource; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestQueueCapacities { + private static final Log LOG = LogFactory.getLog(TestQueueCapacities.class); + private String suffix; + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList(new String[][] { + { "Capacity" }, + { "AbsoluteCapacity" }, + { "UsedCapacity" }, + { "AbsoluteUsedCapacity" }, + { "MaximumCapacity" }, + { "AbsoluteMaximumCapacity" } }); + } + + public TestQueueCapacities(String suffix) { + this.suffix = suffix; + } + + private static float get(QueueCapacities obj, String suffix, + String label) throws Exception { + return executeByName(obj, "get" + suffix, label, -1f); + } + + private static void set(QueueCapacities obj, String suffix, + String label, float value) throws Exception { + executeByName(obj, "set" + suffix, label, value); + } + + // Use reflection to avoid too much avoid code + private static float executeByName(QueueCapacities obj, String methodName, + String label, float value) throws Exception { + // We have 4 kinds of method + // 1. getXXX() : float + // 2. getXXX(label) : float + // 3. setXXX(float) : void + // 4. setXXX(label, float) : void + if (methodName.startsWith("get")) { + float result; + if (label == null) { + // 1. + Method method = QueueCapacities.class.getDeclaredMethod(methodName); + result = (float) method.invoke(obj); + } else { + // 2. + Method method = + QueueCapacities.class.getDeclaredMethod(methodName, String.class); + result = (float) method.invoke(obj, label); + } + return result; + } else { + if (label == null) { + // 3. + Method method = + QueueCapacities.class.getDeclaredMethod(methodName, Float.TYPE); + method.invoke(obj, value); + } else { + // 4. + Method method = + QueueCapacities.class.getDeclaredMethod(methodName, String.class, + Float.TYPE); + method.invoke(obj, label, value); + } + return -1f; + } + } + + private void internalTestModifyAndRead(String label) throws Exception { + QueueCapacities qc = new QueueCapacities(); + + // First get returns 0 always + Assert.assertEquals(0f, get(qc, suffix, label), 1e-8); + + // Set to 1, and check + set(qc, suffix, label, 1f); + Assert.assertEquals(1f, get(qc, suffix, label), 1e-8); + + // Set to 2, and check + set(qc, suffix, label, 2f); + Assert.assertEquals(2f, get(qc, suffix, label), 1e-8); + } + + void check(int mem, int cpu, Resource res) { + Assert.assertEquals(mem, res.getMemory()); + Assert.assertEquals(cpu, res.getVirtualCores()); + } + + @Test + public void testModifyAndRead() throws Exception { + LOG.info("Test - " + suffix); + internalTestModifyAndRead(null); + internalTestModifyAndRead("label"); + } +} \ No newline at end of file