diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index e71ddff..0c7383f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -223,8 +224,36 @@ public Integer getDecommissioningTimeout() { public Resource getPhysicalResource() { return null; } + + @Override + public void setLocalUsedPortsSnapshot(ValueRanges ports) { + } + + @Override + public ValueRanges getAvailablePorts() { + return null; + } + + @Override + public void setAvailablePorts(ValueRanges ports) { + } + + @Override + public ValueRanges getContainerAllocatedPorts() { + return null; + } + + @Override + public void setContainerAllocatedPorts(ValueRanges ports) { + } + + @Override + public ValueRanges getLocalUsedPortsSnapshot() { + return null; + } } + public static RMNode newNodeInfo(String rackName, String hostName, final Resource resource, int port) { final NodeId nodeId = newNodeID(hostName, port); diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 6b7ac3c..e27bb24 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -212,4 +213,34 @@ public Integer getDecommissioningTimeout() { public Resource getPhysicalResource() { return null; } + + @Override + public ValueRanges getAvailablePorts() { + return node.getAvailablePorts(); + } + + @Override + public void setAvailablePorts(ValueRanges ports) { + node.setAvailablePorts(ports); + } + + @Override + public ValueRanges getContainerAllocatedPorts() { + return node.getContainerAllocatedPorts(); + } + + @Override + public void setContainerAllocatedPorts(ValueRanges ports) { + node.setContainerAllocatedPorts(ports); + } + + @Override + public ValueRanges getLocalUsedPortsSnapshot() { + return node.getLocalUsedPortsSnapshot(); + } + + @Override + public void setLocalUsedPortsSnapshot(ValueRanges ports) { + node.setLocalUsedPortsSnapshot(ports); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 0fd41a2..f538885 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.util.Records; /** @@ -55,6 +56,7 @@ private static class SimpleResource extends Resource { private long memory; private long vcores; + private ValueRanges ports; SimpleResource(long memory, long vcores) { this.memory = memory; this.vcores = vcores; @@ -83,20 +85,32 @@ public int getVirtualCores() { public void setVirtualCores(int vcores) { this.vcores = vcores; } + @Override + public ValueRanges getPorts() { + return ports; + } + @Override + public void setPorts(ValueRanges ports) { + this.ports = ports; + } } @Public @Stable - public static Resource newInstance(int memory, int vCores) { - return new SimpleResource(memory, vCores); + public static Resource newInstance(long memory, int vCores) { + return newInstance(memory, vCores, null); } - + @Public @Stable - public static Resource newInstance(long memory, int vCores) { - return new SimpleResource(memory, vCores); + public static Resource newInstance(long memory, int vCores, ValueRanges ports) { + Resource resource = Records.newRecord(Resource.class); + resource.setMemorySize(memory); + resource.setVirtualCores(vCores); + resource.setPorts(ports); + return resource; } - + /** * This method is DEPRECATED: * Use {@link Resource#getMemorySize()} instead @@ -138,6 +152,21 @@ public void setMemorySize(long memory) { "This method is implemented by ResourcePBImpl"); } + /** + * Get ports of the resource. + * @return ports of the resource + */ + @Public + @Stable + public abstract ValueRanges getPorts(); + + /** + * Set ports of the resource. + * @param ports ports of the resource + */ + @Public + @Stable + public abstract void setPorts(ValueRanges ports); /** * Get number of virtual cpu cores of the resource. @@ -193,6 +222,21 @@ public boolean equals(Object obj) { return true; } + public boolean equalsWithPorts(Object obj) { + if (!this.equals(obj)) { + return false; + } else { + Resource other = (Resource) obj; + ValueRanges lPorts = this.getPorts(); + ValueRanges rPorts = other.getPorts(); + if (lPorts == null) { + return rPorts == null; + } else { + return lPorts.equals(rPorts); + } + } + } + @Override public int compareTo(Resource other) { long diff = this.getMemorySize() - other.getMemorySize(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ValueRange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ValueRange.java new file mode 100644 index 0000000..59a7fa1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ValueRange.java @@ -0,0 +1,83 @@ +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.yarn.util.Records; + +public abstract class ValueRange implements Comparable { + + public abstract int getBegin(); + + public abstract int getEnd(); + + public abstract void setBegin(int value); + + public abstract void setEnd(int value); + + public abstract boolean isLessOrEqual(ValueRange other); + + public static ValueRange newInstance(int begin, int end) { + ValueRange valueRange = Records.newRecord(ValueRange.class); + valueRange.setBegin(begin); + valueRange.setEnd(end); + return valueRange; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + if (getBegin() == getEnd()) { + result.append(getBegin()); + } else { + result.append("[" + getBegin() + "-" + getEnd() + "]"); + } + return result.toString(); + } + + @Override + public int compareTo(ValueRange other) { + if (other == null) { + return -1; + } + + if (getBegin() == other.getBegin() && getEnd() == other.getEnd()) { + return 0; + } else if (getBegin() - other.getBegin() < 0) { + return -1; + } else if (getBegin() - other.getBegin() == 0 + && getEnd() - other.getEnd() < 0) { + return -1; + } else { + return 1; + } + + } + + @Override + public ValueRange clone() { + return ValueRange.newInstance(getBegin(), getEnd()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof ValueRange)) + return false; + ValueRange other = (ValueRange) obj; + if (getBegin() == other.getBegin() && getEnd() == other.getEnd()) { + return true; + } else { + return false; + } + } + + @Override + public int hashCode() { + final int prime = 263167; + int result = 0; + result = prime * result + this.getBegin(); + result = prime * result + this.getEnd(); + return result; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ValueRanges.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ValueRanges.java new file mode 100644 index 0000000..5fa3c5c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ValueRanges.java @@ -0,0 +1,571 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.yarn.util.Records; + +public abstract class ValueRanges implements Comparable { + + public static ValueRanges newInstance(List rangesList) { + ValueRanges valueRanges = Records.newRecord(ValueRanges.class); + valueRanges.setRangesList(rangesList); + return valueRanges; + } + + public static ValueRanges newInstance() { + ValueRanges valueRanges = Records.newRecord(ValueRanges.class); + return valueRanges; + } + + public abstract List getRangesList(); + + public abstract List getSortedRangesList(); + + public abstract void setRangesList(List rangesList); + + public abstract BitSet getBitSetStore(); + + public abstract void setBitSetStore(BitSet bitSetStore); + + public abstract boolean isByteStoreEnable(); + + public abstract void setByteStoreEnable(boolean enable); + + public abstract ByteBuffer getBytesStore(); + + @Override + public String toString() { + BitSet bitSetStore = this.getBitSetStore(); + List list = new ArrayList<>(); + + if (bitSetStore == null) { + for (ValueRange range : getSortedRangesList()) { + list.add(range.toString()); + } + } else { + for (int start = bitSetStore.nextSetBit(0); start >= 0;) { + int end = bitSetStore.nextClearBit(start) - 1; + list.add("[" + start + "-" + end + "]"); + start = bitSetStore.nextSetBit(end + 1); + } + } + + return String.join(",", list); + } + + public static ValueRanges convertToBitSet(ValueRanges original) { + ValueRanges result = ValueRanges.newInstance(); + BitSet bitSetStore = new BitSet(); + + if (original != null) { + if (original.getBitSetStore() != null) { + bitSetStore = original.getBitSetStore(); + } else { + if (original.isByteStoreEnable() && original.getBytesStore() != null) { + bitSetStore = BitSet.valueOf(original.getBytesStore()); + } else { + bitSetStore = + ValueRanges.convertFromRangesToBitSet(original.getRangesList()); + } + } + } + + result.setBitSetStore(bitSetStore); + return result; + } + + public static BitSet convertFromRangesToBitSet(List rangesList) { + BitSet bitSetStore = new BitSet(); + + if (rangesList != null) { + for (ValueRange range : rangesList) { + int start = range.getBegin(); + int end = range.getEnd(); + bitSetStore.set(start, end + 1); + } + } + return bitSetStore; + } + + public static List convertFromBitSetToRanges(BitSet bitSetStore) { + List resultList = new ArrayList(); + + if (bitSetStore != null) { + for (int start = bitSetStore.nextSetBit(0); start >= 0;) { + int end = bitSetStore.nextClearBit(start) - 1; + ValueRange range = ValueRange.newInstance(start, end); + resultList.add(range); + start = bitSetStore.nextSetBit(end + 1); + } + } + return resultList; + } + + public boolean isLessOrEqual(ValueRanges other) { + if (other == null) { + return false; + } + + BitSet leftBitSetStore = this.getBitSetStore(); + BitSet rightBitSetStore = other.getBitSetStore(); + boolean leftBitSetStored = (this.getBitSetStore() != null); + boolean rightBitSetStored = (other.getBitSetStore() != null); + + if (leftBitSetStored && rightBitSetStored) { + if (leftBitSetStore.length() > rightBitSetStore.length()) { + return false; + } + for (int i = 0; i < leftBitSetStore.length(); i++) { + if (leftBitSetStore.get(i) && !rightBitSetStore.get(i)) { + return false; + } + } + return true; + } else if (leftBitSetStored && !rightBitSetStored) { + for (ValueRange rightRange : coalesce(other).getRangesList()) { + leftBitSetStore.clear(rightRange.getBegin(), rightRange.getEnd() + 1); + } + return leftBitSetStore.cardinality() == 0; + } else if (!leftBitSetStored && rightBitSetStored) { + for (ValueRange leftRange : coalesce(this).getRangesList()) { + for (int i = leftRange.getBegin(); i <= leftRange.getEnd(); i++) { + if (!rightBitSetStore.get(i)) { + return false; + } + } + } + return true; + } else { + ValueRanges left = coalesce(this); + ValueRanges right = coalesce(other); + for (ValueRange leftRange : left.getRangesList()) { + boolean matched = false; + for (ValueRange rightRange : right.getRangesList()) { + if (leftRange.isLessOrEqual(rightRange)) { + matched = true; + break; + } + } + if (!matched) { + return false; + } + } + return true; + } + } + + public ValueRanges add(ValueRanges left, ValueRanges right) { + if (left == null) { + return coalesce(right); + } + if (right == null) { + return coalesce(left); + } + return coalesce(left, right); + } + + public ValueRanges minus(ValueRanges left, ValueRanges right) { + if (left == null) { + return null; + } + if (right == null) { + return coalesce(left); + } + return coalesce(left).minusSelf(right); + } + + public ValueRanges addSelf(ValueRanges other) { + if (other == null) { + return coalesce(this); + } + return coalesce(this, other); + } + + public ValueRanges minusSelf(ValueRanges other) { + if (other == null) { + return this; + } + + BitSet leftBitSetStore = this.getBitSetStore(); + BitSet rightBitSetStore = other.getBitSetStore(); + boolean leftBitSetStored = (this.getBitSetStore() != null); + boolean rightBitSetStored = (other.getBitSetStore() != null); + + ValueRanges result = ValueRanges.newInstance(); + + if (leftBitSetStored && rightBitSetStored) { + leftBitSetStore.andNot(rightBitSetStore); + + result.setBitSetStore(leftBitSetStore); + // to return ValueRanges which has the same store style to left + } else if (leftBitSetStored && !rightBitSetStored) { + for (ValueRange rightRange : coalesce(other).getRangesList()) { + leftBitSetStore.set(rightRange.getBegin(), rightRange.getEnd() + 1, + false); + } + + result.setBitSetStore(leftBitSetStore); + } else if (!leftBitSetStored && rightBitSetStored) { + BitSet bitSetStore = new BitSet(); + for (ValueRange leftRange : coalesce(this).getRangesList()) { + bitSetStore.set(leftRange.getBegin(), leftRange.getEnd() + 1, true); + } + bitSetStore.andNot(rightBitSetStore); + List resultList = convertFromBitSetToRanges(bitSetStore); + + result.setRangesList(resultList); + result.setCoalesced(true); + } else { + List leftList = cloneList(coalesce(this).getRangesList()); + List rightList = coalesce(other).getRangesList(); + int i = 0; + int j = 0; + while (i < leftList.size() && j < rightList.size()) { + ValueRange left = leftList.get(i); + ValueRange right = rightList.get(j); + // 1. no overlap, right is bigger than left + if (left.getEnd() < right.getBegin()) { + i++; + // 2. no overlap, left is bigger than right + } else if (right.getEnd() < left.getBegin()) { + j++; + // 3. has overlap, left is less than right + } else if ((left.getBegin() <= right.getBegin()) + && (left.getEnd() <= right.getEnd())) { + if (left.getBegin() == right.getBegin()) { + leftList.remove(i); + } else { + left.setEnd(right.getBegin() - 1); + } + // 4. has overlap, left is bigger than right + } else if ((left.getBegin() >= right.getBegin()) + && (left.getEnd() >= right.getEnd())) { + if (left.getEnd() == right.getEnd()) { + leftList.remove(i); + } else { + left.setBegin(right.getEnd() + 1); + } + // 5. left contains right + } else if ((left.getBegin() < right.getBegin()) + && (left.getEnd() > right.getEnd())) { + ValueRange newRange = + ValueRange.newInstance(right.getEnd() + 1, left.getEnd()); + leftList.add(i + 1, newRange); + left.setEnd(right.getBegin() - 1); + // 6. right contains left + } else if ((left.getBegin() > right.getBegin()) + && (left.getEnd() < right.getEnd())) { + leftList.remove(i); + } + } + + result.setRangesList(leftList); + result.setCoalesced(true); + } + return result; + } + + /** + * Coalescing ValueRanges + * + * @param left, may be ValueRanges or BitSetStores + * @param right, may be ValueRanges or BitSetStores + * @return merged ValueRanges whose internal store type is the same as left + */ + private ValueRanges coalesce(ValueRanges left, ValueRanges right) { + if (left == null) { + return right; + } + if (right == null) { + return left; + } + + BitSet leftBitSetStore = left.getBitSetStore(); + BitSet rightBitSetStore = right.getBitSetStore(); + boolean leftBitSetStored = (left.getBitSetStore() != null); + boolean rightBitSetStored = (right.getBitSetStore() != null); + + ValueRanges mergedRanges = ValueRanges.newInstance(); + if (leftBitSetStored && rightBitSetStored) { + BitSet bitSetStores = new BitSet(); + bitSetStores.or(leftBitSetStore); + bitSetStores.or(rightBitSetStore); + + mergedRanges.setBitSetStore(bitSetStores); + + } else if (leftBitSetStored && !rightBitSetStored) { + for (ValueRange rightRange : right.getRangesList()) { + leftBitSetStore.set(rightRange.getBegin(), rightRange.getEnd() + 1, + true); + } + + mergedRanges.setBitSetStore(leftBitSetStore); + } else if (!leftBitSetStored && rightBitSetStored) { + List rangesList = cloneList(left.getSortedRangesList()); + rangesList.addAll(convertFromBitSetToRanges(rightBitSetStore)); + Collections.sort(rangesList); + + mergedRanges.setRangesList(coalesceList(rangesList)); + mergedRanges.setCoalesced(true); + } else { + List leftList = cloneList(left.getRangesList()); + leftList.addAll(cloneList(right.getRangesList())); + Collections.sort(leftList); + + mergedRanges.setRangesList(coalesceList(leftList)); + mergedRanges.setCoalesced(true); + } + return mergedRanges; + } + + private static List coalesceList(List sortedList) { + if (sortedList == null || sortedList.isEmpty()) { + return sortedList; + } + + List resultList = new ArrayList(); + + ValueRange current = sortedList.get(0).clone(); + resultList.add(current); + + // In a single pass, we compute the size of the end result, as well as + // modify + // in place the intermediate data structure to build up result as we + // solve it. + + for (ValueRange range : sortedList) { + // Skip if this range is equivalent to the current range. + if (range.getBegin() == current.getBegin() + && range.getEnd() == current.getEnd()) { + continue; + } + // If the current range just needs to be extended on the right. + if (range.getBegin() == current.getBegin() + && range.getEnd() > current.getEnd()) { + current.setEnd(range.getEnd()); + } else if (range.getBegin() > current.getBegin()) { + // If we are starting farther ahead, then there are 2 cases: + if (range.getBegin() <= current.getEnd() + 1) { + // 1. Ranges are overlapping and we can merge them. + current.setEnd(Math.max(current.getEnd(), range.getEnd())); + } else { + // 2. No overlap and we are adding a new range. + current = range.clone(); + resultList.add(current); + } + } + } + return resultList; + } + + /** + * + * @param uranges that may be ValueRanges or BitSetStores, if it's + * BitSetStores, do nothing + * @return ValueRanges that is coalesced + */ + private static ValueRanges coalesce(ValueRanges uranges) { + if (uranges == null) { + return null; + } + + if (uranges.isCoalesced()) { + return uranges; + } + + if (uranges.getBitSetStore() != null) { + return uranges; + } + + ValueRanges result = ValueRanges.newInstance(); + if (uranges.getRangesCount() == 0) { + return result; + } + List rangesList = uranges.getSortedRangesList(); + + result.setRangesList(coalesceList(rangesList)); + result.setCoalesced(true); + + return result; + } + + public synchronized static List cloneList(List list) { + List newList = new ArrayList(); + for (ValueRange range : list) { + newList.add(range.clone()); + } + return newList; + } + + public abstract int getRangesCount(); + + /** + * This method is used to check if the ValueRanges coalesced, coalesced means + * no override parts and well sorted. For example, [1-3],[5-10] is coalesced, + * and [1-4],[3-10] and [5-10].[1-3] is not. + * + * @return true or false + */ + public abstract boolean isCoalesced(); + + public abstract void setCoalesced(boolean flag); + + /** + * Initialize the ValueRanges from expression, we current support[1-3],[5-10] + * style + * + * @param expression + * @return + */ + public static ValueRanges iniFromExpression(String expression) { + return iniFromExpression(expression, false); + } + + /** + * Initialize the ValueRanges from expression, we currently + * support[1-3],[5-10] style + * + * @param expression + * @return ValueRanges + */ + public static ValueRanges iniFromExpression(String expression, + boolean enableBitSet) { + ValueRanges valueRanges = Records.newRecord(ValueRanges.class); + String[] items = expression.split(","); + Pattern pattern = Pattern.compile("^\\[(\\d+)\\-(\\d+)\\]$"); + // Generate rangeList or bitSetStore + List rangesList = new ArrayList(); + BitSet bitSetStore = new BitSet(); + + for (String item : items) { + Matcher matcher = pattern.matcher(item); + if (matcher.find()) { + int start = Integer.parseInt(matcher.group(1)); + int end = Integer.parseInt(matcher.group(2)); + if (enableBitSet) { + bitSetStore.set(start, end + 1); + } else { + rangesList.add(ValueRange.newInstance(start, end)); + } + } else { + try { + int num = Integer.parseInt(item); + if (enableBitSet) { + bitSetStore.set(num); + } else { + rangesList.add(ValueRange.newInstance(num, num)); + } + } catch (NumberFormatException e) { + // ignore this num + } + } + } + if (enableBitSet) { + valueRanges.setBitSetStore(bitSetStore); + valueRanges.setByteStoreEnable(true); + } else { + valueRanges.setRangesList(rangesList); + } + return valueRanges; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof ValueRanges)) + return false; + ValueRanges other = (ValueRanges) obj; + if (this.equals(other)) { + return true; + } else { + return false; + } + } + + public synchronized boolean equals(ValueRanges other) { + if (other == null) { + return false; + } + + BitSet leftBitSetStore = this.getBitSetStore(); + BitSet rightBitSetStore = other.getBitSetStore(); + boolean leftBitSetStored = (this.getBitSetStore() != null); + boolean rightBitSetStored = (other.getBitSetStore() != null); + + if (leftBitSetStored && rightBitSetStored) { + return leftBitSetStore.equals(rightBitSetStore); + } else if (leftBitSetStored || rightBitSetStored) { + ValueRanges valueRanges = + leftBitSetStored ? coalesce(other) : coalesce(this); + BitSet bitSetStore = + leftBitSetStored ? leftBitSetStore : rightBitSetStore; + int count = 0; + for (ValueRange range : valueRanges.getRangesList()) { + for (int i = range.getBegin(); i <= range.getEnd(); i++) { + if (!bitSetStore.get(i)) { + return false; + } + } + count += range.getEnd() - range.getBegin() + 1; + } + return count == bitSetStore.cardinality(); + } else { + ValueRanges left = coalesce(this); + ValueRanges right = coalesce(other); + if (left.getRangesCount() != right.getRangesCount()) { + return false; + } + List leftRange = left.getRangesList(); + List rightRange = right.getRangesList(); + for (int i = 0; i < left.getRangesCount(); i++) { + if (!leftRange.get(i).equals(rightRange.get(i))) { + return false; + } + } + return true; + } + } + + @Override + public int hashCode() { + return getRangesList().hashCode(); + } + + @Override + public int compareTo(ValueRanges other) { + if (this.equals(other)) { + return 0; + } else if (this.isLessOrEqual(other)) { + return -1; + } else { + return 1; + } + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 86f45b8..1cf388e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1177,6 +1177,34 @@ public static boolean isAclEnabled(Configuration conf) { + "resource.pcores-vcores-multiplier"; public static final float DEFAULT_NM_PCORES_VCORES_MULTIPLIER = 1.0f; + /** Range of ports which can be allocated for containers. */ + public static final String NM_PORTS = NM_PREFIX + "resource.ports"; + public static final String DEFAULT_NM_PORTS = "[1-19999]"; + + /** + * Rounds of updating ports. This parameter is circle controller for updating + * local allocated ports info, since the ports info is big. We can control the + * update frequency to have balance with cluster scale and ports info's + * accuracy + */ + public static final String NM_PORTS_UPDATE_ROUNDS = NM_PREFIX + + "resource.ports-update-rounds"; + public static final int DEFAULT_NM_PORTS_UPDATE_ROUNDS = 10; + + /** Whether to enable ports collection */ + public static final String PORTS_AS_RESOURCE_ENABLE = YARN_PREFIX + + "ports_as_resource.enable"; + public static final boolean DEFAULT_PORTS_AS_RESOURCE_ENABLE = false; + + /** + * Whether to enable ports bitset store. If ports bitset store is enabled, + * memory usage for storing the status of ports usage will be reduced + */ + public static final String PORTS_BITSET_STORE_ENABLE = YARN_PREFIX + + "ports_bitset_store.enable"; + public static final boolean DEFAULT_PORTS_BITSET_STORE_ENABLE = false; + + /** Percentage of overall CPU which can be allocated for containers. */ public static final String NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT = NM_PREFIX + "resource.percentage-physical-cpu-limit"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 81ebd79..09608a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -53,9 +53,22 @@ message ContainerIdProto { optional int64 id = 3; } +message ValueRangeProto{ + required int32 begin = 1; + required int32 end = 2; +} + +message ValueRangesProto { + repeated ValueRangeProto ranges = 1; + optional bytes ranges_byte_store = 2; + optional bool byte_store_enable = 3 [default = false]; + optional int32 byte_store_encode = 4 [default = 0]; +} + message ResourceProto { optional int64 memory = 1; optional int32 virtual_cores = 2; + optional ValueRangesProto ports = 3; } message ResourceUtilizationProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java index 34109be..a5dba5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java @@ -18,12 +18,13 @@ package org.apache.hadoop.yarn.api.records.impl.pb; - import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProtoOrBuilder; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangesProto; @Private @Unstable @@ -31,6 +32,7 @@ ResourceProto proto = ResourceProto.getDefaultInstance(); ResourceProto.Builder builder = null; boolean viaProto = false; + ValueRanges ports = null; // call via ProtoUtils.convertToProtoFormat(Resource) static ResourceProto getProto(Resource r) { @@ -60,6 +62,21 @@ public ResourceProto getProto() { return proto; } + private synchronized void mergeLocalToBuilder() { + if (this.ports != null) { + builder.setPorts(convertToProtoFormat(this.ports)); + } + } + + private synchronized void mergeLocalToProto() { + if (viaProto){ + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + private void maybeInitBuilder() { if (viaProto || builder == null) { builder = ResourceProto.newBuilder(proto); @@ -102,4 +119,43 @@ public void setVirtualCores(int vCores) { maybeInitBuilder(); builder.setVirtualCores(vCores); } + @Override + public void setPorts(ValueRanges ports) { + maybeInitBuilder(); + if (ports == null) { + builder.clearPorts(); + } + this.ports = ports; + } + + @Override + public ValueRanges getPorts() { + ResourceProtoOrBuilder p = viaProto ? proto : builder; + if (this.ports != null) { + return this.ports; + } + if (!p.hasPorts()) { + return null; + } + this.ports = convertFromProtoFormat(p.getPorts()); + return this.ports; + } + + @Override + public int compareTo(Resource other) { + int diff = this.getMemory() - other.getMemory(); + if (diff == 0) { + diff = this.getVirtualCores() - other.getVirtualCores(); + } + return diff; + } + + private static ValueRanges convertFromProtoFormat( ValueRangesProto proto) { + return new ValueRangesPBImpl(proto); + } + + private ValueRangesProto convertToProtoFormat(ValueRanges m) { + return ((ValueRangesPBImpl)m).getProto(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ValueRangePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ValueRangePBImpl.java new file mode 100644 index 0000000..f0ba964 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ValueRangePBImpl.java @@ -0,0 +1,103 @@ +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ValueRange; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangeProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangeProtoOrBuilder; + +public class ValueRangePBImpl extends ValueRange { + + ValueRangeProto proto = ValueRangeProto.getDefaultInstance(); + ValueRangeProto.Builder builder = null; + boolean viaProto = false; + int begin, end = -1; + + public ValueRangePBImpl(ValueRangeProto proto) { + this.proto = proto; + viaProto = true; + } + + public ValueRangePBImpl() { + } + + public ValueRangeProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int getBegin() { + initLocalRange(); + return begin; + } + + @Override + public int getEnd() { + initLocalRange(); + return end; + } + + @Override + public void setBegin(int value) { + begin = value; + } + + @Override + public void setEnd(int value) { + end = value; + } + + @Override + public boolean isLessOrEqual(ValueRange other) { + if (this.getBegin() >= other.getBegin() && this.getEnd() <= other.getEnd()) { + return true; + } + return false; + } + + private void maybeInitBuilder() { + if (viaProto) { + builder = ValueRangeProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (begin != -1 && end != -1) { + addRangeToProto(); + } + } + + private void addRangeToProto() { + maybeInitBuilder(); + if (begin == -1 && end == -1) + return; + if (builder == null) { + builder = ValueRangeProto.newBuilder(); + } + builder.setBegin(begin); + builder.setEnd(end); + } + + private void initLocalRange() { + if (begin != -1 && end != -1) { + return; + } + if (!viaProto && builder == null) { + builder = ValueRangeProto.newBuilder(); + } + ValueRangeProtoOrBuilder p = viaProto ? proto : builder; + begin = p.getBegin(); + end = p.getEnd(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ValueRangesPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ValueRangesPBImpl.java new file mode 100644 index 0000000..f9a58bc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ValueRangesPBImpl.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.api.records.impl.pb; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.BitSet; + +import com.google.protobuf.ByteString; +import org.apache.hadoop.yarn.api.records.ValueRange; +import org.apache.hadoop.yarn.api.records.ValueRanges; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangeProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangesProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangesProtoOrBuilder; + +public class ValueRangesPBImpl extends ValueRanges { + + ValueRangesProto proto = ValueRangesProto.getDefaultInstance(); + ValueRangesProto.Builder builder = null; + boolean viaProto = false; + List ranges = null; + List unmodifiableRanges = null; + + private boolean isCoalesced = false; + + private BitSet bitSetStore = null; + + private boolean byteStoreEnable = false; + + /** + * TODO: we have a plan to compress the bitset if currently still allocate too + * much memory, like gzip to compress. But seems currenly we get the ideal + * result, so will re-consider the plan after roll-out to prod bed + */ + private int byte_store_encode = 0; + + public ValueRangesPBImpl(ValueRangesProto proto) { + this.proto = proto; + viaProto = true; + } + + public ValueRangesPBImpl() { + builder = ValueRangesProto.newBuilder(); + } + + public ValueRangesProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + public synchronized void setByteStoreEnable(boolean enable) { + byteStoreEnable = enable; + } + + public synchronized boolean isByteStoreEnable() { + if (ranges != null || bitSetStore != null) { + return byteStoreEnable; + } + + ValueRangesProtoOrBuilder p = viaProto ? proto : builder; + if (p.getByteStoreEnable() || p.hasRangesByteStore()) { + byteStoreEnable = true; + } + return byteStoreEnable; + } + + public boolean isCoalesced() { + return isCoalesced; + } + + public synchronized void setCoalesced(boolean flag) { + isCoalesced = flag; + } + + public synchronized BitSet getBitSetStore() { + initLocalRangesStore(); + if (bitSetStore != null) { + return (BitSet) bitSetStore.clone(); + } + return null; + } + + public synchronized void setBitSetStore(BitSet bitSetStore) { + this.bitSetStore = (BitSet) bitSetStore.clone(); + byteStoreEnable = true; + } + + @Override + public synchronized ByteBuffer getBytesStore() { + ValueRangesProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasRangesByteStore()) { + return null; + } + ByteBuffer rangesByteBuffer = + convertFromProtoFormat(p.getRangesByteStore()); + return rangesByteBuffer; + } + + private void initLocalRangesStore() { + if (this.ranges != null || this.bitSetStore != null) { + return; + } + isByteStoreEnable(); + if (byteStoreEnable) { + initLocalBitSetStore(); + } else { + initLocalRanges(); + } + } + + private void initLocalBitSetStore() { + if (this.bitSetStore != null) { + return; + } + + ValueRangesProtoOrBuilder p = viaProto ? proto : builder; + bitSetStore = new BitSet(); + if (!p.hasRangesByteStore()) { + return; + } + ByteBuffer rangesByteBuffer = + convertFromProtoFormat(p.getRangesByteStore()); + if (rangesByteBuffer != null) { + bitSetStore = BitSet.valueOf(rangesByteBuffer); + } + } + + private void initLocalRanges() { + if (this.ranges != null) { + return; + } + ValueRangesProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getRangesList(); + List tempRanges = new ArrayList(); + for (ValueRangeProto a : list) { + tempRanges.add(convertFromProtoFormat(a)); + } + assignRanges(tempRanges); + } + + @Override + public synchronized int getRangesCount() { + int result = 0; + initLocalRangesStore(); + if (bitSetStore != null) { + List list = convertFromBitSetToRanges(bitSetStore); + if (list != null) { + result = list.size(); + } + } else { + result = getRangesList().size(); + } + return result; + } + + private void assignRanges(List value) { + List newList = new ArrayList(); + for (ValueRange range : value) { + newList.add(range.clone()); + } + ranges = newList; + unmodifiableRanges = Collections.unmodifiableList(value); + } + + @Override + public synchronized List getSortedRangesList() { + initLocalRangesStore(); + List newList = cloneList(this.getRangesList()); + Collections.sort(newList); + return newList; + } + + @Override + public synchronized List getRangesList() { + initLocalRangesStore(); + return unmodifiableRanges; + } + + @Override + public synchronized void setRangesList(List rangesList) { + if (rangesList == null) { + maybeInitBuilder(); + builder.clearRanges(); + } + assignRanges(rangesList); + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ValueRangesProto.newBuilder(proto); + } + viaProto = false; + } + + private void mergeLocalToBuilder() { + if (this.ranges != null) { + addRangesToProto(); + } + if (byteStoreEnable) { + addByteStoreEnableToProto(); + addByteStoreToProto(); + } + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void addRangesToProto() { + maybeInitBuilder(); + if (ranges == null || ranges.isEmpty()) { + builder.clearRanges(); + return; + } + List list = new LinkedList<>(); + for (ValueRange range : ranges) { + list.add(convertToProtoFormat(range)); + } + builder.clearRanges(); + builder.addAllRanges(list); + } + + private void addByteStoreEnableToProto() { + maybeInitBuilder(); + builder.setByteStoreEnable(byteStoreEnable); + } + + private void addByteStoreToProto() { + if (this.bitSetStore != null) { + byte[] result = bitSetStore.toByteArray(); + builder.setRangesByteStore(convertToProtoFormat(ByteBuffer.wrap(result))); + } + } + + protected final ByteBuffer convertFromProtoFormat(ByteString byteString) { + return ProtoUtils.convertFromProtoFormat(byteString); + } + + protected final ByteString convertToProtoFormat(ByteBuffer byteBuffer) { + return ProtoUtils.convertToProtoFormat(byteBuffer); + } + + private static ValueRangePBImpl convertFromProtoFormat(ValueRangeProto a) { + return new ValueRangePBImpl(a); + } + + private static ValueRangeProto convertToProtoFormat(ValueRange t) { + return ((ValueRangePBImpl) t).getProto(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/PortsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/PortsInfo.java new file mode 100755 index 0000000..394334c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/PortsInfo.java @@ -0,0 +1,80 @@ +package org.apache.hadoop.yarn.util; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.file.Files; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.SysInfoWindows; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ValueRanges; + +public class PortsInfo { + private static final Log LOG = LogFactory.getLog(PortsInfo.class); + private long lastRefreshTime; + static final int REFRESH_INTERVAL_MS = 2000; + + private ValueRanges ports; + + public PortsInfo() { + lastRefreshTime = 0; + reset(); + } + + long now() { + return Time.monotonicNow(); + } + + void reset() { + ports = null; + } + + void refreshIfNeeded(boolean enableBitSet) { + long now = now(); + if (now - lastRefreshTime > REFRESH_INTERVAL_MS) { + lastRefreshTime = now; + try { + File f = new File("GetAllocatedPorts.ps1"); + if (!f.exists()) { + Files.copy( + PortsInfo.class.getResourceAsStream("/GetAllocatedPorts.ps1"), + f.toPath()); + } + // Use a ProcessBuilder + ProcessBuilder pb = + new ProcessBuilder("powershell.exe", f.getAbsolutePath()); + Process p = pb.start(); + InputStream is = p.getInputStream(); + BufferedReader br = new BufferedReader(new InputStreamReader(is)); + String line = null; + String portsString = null; + while ((line = br.readLine()) != null) { + if (!line.isEmpty()) { + portsString = line; + } + } + if (portsString != null && !portsString.isEmpty()) { + ports = ValueRanges.iniFromExpression(portsString, enableBitSet); + } else { + LOG.warn( + "Get allocated ports result is empty, fail to get ports info "); + } + int r = p.waitFor(); // Let the process finish. + // Remove it after finish + f.deleteOnExit(); + } catch (Exception e) { + LOG.warn("Fail to get allocated ports info "); + e.printStackTrace(); + } + } + } + + public ValueRanges GetAllocatedPorts(boolean enableBitSet) { + refreshIfNeeded(enableBitSet); + return ports; + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java index bdf60bd..a2d4374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java @@ -36,6 +36,11 @@ public int compare(Resource unused, Resource lhs, Resource rhs, return Long.compare(lhs.getMemorySize(), rhs.getMemorySize()); } + public int compareWithPorts(Resource clusterResource, Resource lhs, + Resource rhs) { + return compare(clusterResource, lhs, rhs); + } + @Override public long computeAvailableContainers(Resource available, Resource required) { // Only consider memory @@ -63,13 +68,13 @@ public float ratio(Resource a, Resource b) { @Override public Resource divideAndCeil(Resource numerator, int denominator) { return Resources.createResource( - divideAndCeil(numerator.getMemorySize(), denominator)); + divideAndCeil(numerator.getMemory(), denominator)); } @Override public Resource divideAndCeil(Resource numerator, float denominator) { return Resources.createResource( - divideAndCeil(numerator.getMemorySize(), denominator)); + divideAndCeil(numerator.getMemory(), denominator)); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java index 7697e1d..f97ff14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ValueRanges; /** * A {@link ResourceCalculator} which uses the concept of @@ -93,6 +94,24 @@ public int compare(Resource clusterResource, Resource lhs, Resource rhs, return 0; } + public int compareWithPorts(Resource clusterResource, Resource lhs, + Resource rhs) { + int diff = compare(clusterResource, lhs, rhs); + + if (diff == 0) { + ValueRanges lPorts = lhs.getPorts(); + ValueRanges rPorts = rhs.getPorts(); + if(lPorts == null){ + diff = rPorts == null ? 0 : 1; + } else if (rPorts == null) { + diff = -1; + } else { + diff = lPorts.compareTo(rPorts); + } + } + return diff; + } + /** * Use 'dominant' for now since we only have 2 resources - gives us a slight * performance boost. @@ -117,8 +136,14 @@ protected float getResourceAsValue( @Override public long computeAvailableContainers(Resource available, Resource required) { + if (required.getPorts() != null && required.getPorts().getRangesCount()>0) { + // required ports resource, so we can not allocate more than one container + return Math.min( + Math.min(available.getMemorySize() / required.getMemorySize(), + available.getVirtualCores() / required.getVirtualCores()), 1); + } return Math.min( - available.getMemorySize() / required.getMemorySize(), + available.getMemorySize() / required.getMemorySize(), available.getVirtualCores() / required.getVirtualCores()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index 398dac5..6e3a58f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -58,6 +58,9 @@ public abstract int compare( public int compare(Resource clusterResource, Resource lhs, Resource rhs) { return compare(clusterResource, lhs, rhs, false); } + public abstract int compareWithPorts(Resource clusterResource, Resource lhs, + Resource rhs); + public static int divideAndCeil(int a, int b) { if (b == 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index a1d14fd..040c9a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -21,6 +21,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ValueRanges; +import org.apache.hadoop.yarn.util.Records; @InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"}) @Unstable @@ -62,6 +64,16 @@ public void setVirtualCores(int cores) { } @Override + public ValueRanges getPorts() { + return null; + } + + @Override + public void setPorts(ValueRanges port) { + throw new RuntimeException("NONE cannot be modified!"); + } + + @Override public int compareTo(Resource o) { long diff = 0 - o.getMemorySize(); if (diff == 0) { @@ -105,6 +117,16 @@ public int getVirtualCores() { public void setVirtualCores(int cores) { throw new RuntimeException("UNBOUNDED cannot be modified!"); } + + @Override + public ValueRanges getPorts() { + return null; + } + + @Override + public void setPorts(ValueRanges port) { + throw new RuntimeException("NONE cannot be modified!"); + } @Override public int compareTo(Resource o) { @@ -117,21 +139,22 @@ public int compareTo(Resource o) { }; - public static Resource createResource(int memory) { - return createResource(memory, (memory > 0) ? 1 : 0); - } - - public static Resource createResource(int memory, int cores) { - return Resource.newInstance(memory, cores); - } - - public static Resource createResource(long memory) { - return createResource(memory, (memory > 0) ? 1 : 0); - } - - public static Resource createResource(long memory, int cores) { - return Resource.newInstance(memory, cores); - } + public static Resource createResource(long memory) { + return createResource(memory, (memory > 0) ? 1 : 0); + } + + public static Resource createResource(long memory, int cores) { + return createResource(memory, cores, null); + } + + public static Resource createResource(long memory, int cores, + ValueRanges ports) { + Resource resource = Records.newRecord(Resource.class); + resource.setMemorySize(memory); + resource.setVirtualCores(cores); + resource.setPorts(ports); + return resource; + } public static Resource none() { return NONE; @@ -154,25 +177,67 @@ public static Resource unbounded() { public static Resource clone(Resource res) { return createResource(res.getMemorySize(), res.getVirtualCores()); } + + public static Resource cloneWithPorts(Resource res) { + return createResource(res.getMemorySize(), res.getVirtualCores(), + res.getPorts()); + } public static Resource addTo(Resource lhs, Resource rhs) { + return addTo(lhs, rhs, true); + } + + public static Resource addToWithPorts(Resource lhs, Resource rhs) { + return addTo(lhs, rhs, false); + } + + public static Resource addTo(Resource lhs, Resource rhs, boolean ignorePorts) { lhs.setMemorySize(lhs.getMemorySize() + rhs.getMemorySize()); lhs.setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores()); + if (!ignorePorts) { + if (lhs.getPorts() != null) { + lhs.setPorts(lhs.getPorts().addSelf(rhs.getPorts())); + } else { + lhs.setPorts(rhs.getPorts()); + } + } return lhs; } public static Resource add(Resource lhs, Resource rhs) { return addTo(clone(lhs), rhs); } + + public static Resource addWithPorts(Resource lhs, Resource rhs) { + return addToWithPorts(cloneWithPorts(lhs), rhs); + } public static Resource subtractFrom(Resource lhs, Resource rhs) { - lhs.setMemorySize(lhs.getMemorySize() - rhs.getMemorySize()); + return subtractFrom(lhs, rhs, true); + } + + public static Resource subtractFromWithPorts(Resource lhs, Resource rhs) { + return subtractFrom(lhs, rhs, false); + } + + public static Resource subtractFrom(Resource lhs, Resource rhs, + boolean ignorePorts) { + lhs.setMemory(lhs.getMemory() - rhs.getMemory()); lhs.setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores()); + if (!ignorePorts) { + if (lhs.getPorts() != null) { + lhs.setPorts(lhs.getPorts().minusSelf(rhs.getPorts())); + } + } return lhs; } public static Resource subtract(Resource lhs, Resource rhs) { - return subtractFrom(clone(lhs), rhs); + return subtractFrom(clone(lhs), rhs , true); + } + + public static Resource subtractWithPorts(Resource lhs, Resource rhs) { + return subtractFrom(cloneWithPorts(lhs), rhs , false); } /** @@ -331,6 +396,46 @@ public static Resource max( return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0 ? lhs : rhs; } + public static boolean equalsWithPorts(Resource lhs, Resource rhs) { + return lhs.equalsWithPorts(rhs); + } + + public static boolean lessThanWithPorts( + ResourceCalculator resourceCalculator, + Resource clusterResource, Resource lhs, Resource rhs) { + return (resourceCalculator.compareWithPorts(clusterResource, lhs, rhs) < 0); + } + + public static boolean lessThanOrEqualWithPorts( + ResourceCalculator resourceCalculator, + Resource clusterResource, Resource lhs, Resource rhs) { + return (resourceCalculator.compareWithPorts(clusterResource, lhs, rhs) <= 0); + } + + public static boolean greaterThanWithPorts( + ResourceCalculator resourceCalculator, + Resource clusterResource, Resource lhs, Resource rhs) { + return resourceCalculator.compareWithPorts(clusterResource, lhs, rhs) > 0; + } + + public static boolean greaterThanOrEqualWithPorts( + ResourceCalculator resourceCalculator, Resource clusterResource, + Resource lhs, Resource rhs) { + return resourceCalculator.compareWithPorts(clusterResource, lhs, rhs) >= 0; + } + + public static Resource minWithPorts(ResourceCalculator resourceCalculator, + Resource clusterResource, Resource lhs, Resource rhs) { + return resourceCalculator.compareWithPorts(clusterResource, lhs, rhs) <= 0 ? lhs + : rhs; + } + + public static Resource maxWithPorts(ResourceCalculator resourceCalculator, + Resource clusterResource, Resource lhs, Resource rhs) { + return resourceCalculator.compareWithPorts(clusterResource, lhs, rhs) >= 0 ? lhs + : rhs; + } + public static boolean fitsIn(Resource smaller, Resource bigger) { return smaller.getMemorySize() <= bigger.getMemorySize() && smaller.getVirtualCores() <= bigger.getVirtualCores(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java index 82170b3..3751afe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/BasePBImplRecordsTest.java @@ -22,6 +22,10 @@ import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeLabel; +import org.apache.hadoop.yarn.api.records.QueueStatistics; +import org.apache.hadoop.yarn.api.records.ValueRange; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.junit.Assert; import java.lang.reflect.*; @@ -103,6 +107,20 @@ private static Object genTypeValue(Type type) { ret = map; } } + if (type.equals(ValueRanges.class)) { + ret = ValueRanges.newInstance(); + } + if (type.equals(ValueRange.class)) { + ret = ValueRange.newInstance(0, 0); + } + if (type.equals(NodeLabel.class)) { + ret = NodeLabel.newInstance("test"); + } + if (type.equals(QueueStatistics.class)) { + ret = + QueueStatistics.newInstance(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0); + } if (ret == null) { throw new IllegalArgumentException("type " + type + " is not supported"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index bb688c9..84c2002 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -324,7 +324,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReplaceLabelsOnNodeResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResourceResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ValueRange; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangeProto; +import org.apache.hadoop.yarn.api.records.impl.pb.ValueRangePBImpl; +import org.apache.hadoop.yarn.api.records.ValueRanges; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangesProto; +import org.apache.hadoop.yarn.api.records.impl.pb.ValueRangesPBImpl; import org.apache.hadoop.yarn.util.resource.Resources; + import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; @@ -1151,4 +1158,14 @@ public void testExecutionTypeRequestPBImpl() throws Exception { validatePBImplRecord(ExecutionTypeRequestPBImpl.class, ExecutionTypeRequestProto.class); } + + @Test + public void testCheckForValueRangePBImpl() throws Exception { + validatePBImplRecord(ValueRangePBImpl.class, ValueRangeProto.class); + } + + @Test + public void testCheckForValueRangesPBImpl() throws Exception { + validatePBImplRecord(ValueRangesPBImpl.class, ValueRangesProto.class); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestValueRanges.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestValueRanges.java new file mode 100644 index 0000000..000b436 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/TestValueRanges.java @@ -0,0 +1,209 @@ +package org.apache.hadoop.yarn.api.records; + +import org.junit.Assert; +import org.junit.Test; + +public class TestValueRanges { + @Test(timeout = 2000) + public void testValueRnagesBasicOperation() { + + // Equal Test + ValueRanges lhs = ValueRanges.iniFromExpression("[1-3]", true); + ValueRanges rhs = ValueRanges.iniFromExpression("[1-3]"); + lhs = ValueRanges.convertToBitSet(lhs); + rhs = ValueRanges.convertToBitSet(rhs); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3]", true); + rhs = ValueRanges.iniFromExpression("[1-10]", true); + lhs = ValueRanges.convertToBitSet(lhs); + rhs = ValueRanges.convertToBitSet(rhs); + Assert.assertFalse("Equal operation is wrongly calculated.", + lhs.equals(rhs)); + + lhs = ValueRanges.iniFromExpression("1,2,3,4,5,6,7,8,9,10"); + rhs = ValueRanges.iniFromExpression("[1-10]"); + lhs = ValueRanges.convertToBitSet(lhs); + rhs = ValueRanges.convertToBitSet(rhs); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + // Add Test + lhs = ValueRanges.iniFromExpression("[1-3]", true); + lhs = ValueRanges.convertToBitSet(lhs); + lhs = lhs.addSelf(ValueRanges.iniFromExpression("[4-10]")); + rhs = ValueRanges.iniFromExpression("[1-10]"); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3]"); + ValueRanges temp = ValueRanges.iniFromExpression("[4-10]", true); + temp = ValueRanges.convertToBitSet(temp); + lhs = lhs.addSelf(temp); + rhs = ValueRanges.iniFromExpression("[1-10]"); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3]") + .addSelf(ValueRanges.iniFromExpression("[7-10]")); + rhs = ValueRanges.iniFromExpression("[1-3],[7-10]"); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.newInstance(); + lhs = ValueRanges.convertToBitSet(lhs); + lhs = lhs.addSelf(ValueRanges.iniFromExpression("[1-10]")); + + rhs = ValueRanges.iniFromExpression("[1-10]"); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + // Minus Test + + lhs = ValueRanges.iniFromExpression("[1-3]", true); + rhs = ValueRanges.iniFromExpression("[1-10]"); + lhs = ValueRanges.convertToBitSet(lhs); + rhs = ValueRanges.convertToBitSet(rhs); + rhs = rhs.minusSelf(ValueRanges.iniFromExpression("[4-10]")); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3],[5-10]"); + rhs = ValueRanges.iniFromExpression("[1-10]"); + rhs = ValueRanges.convertToBitSet(rhs); + temp = ValueRanges.iniFromExpression("4"); + rhs = rhs.minusSelf(temp); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3],[5-10]"); + rhs = ValueRanges.iniFromExpression("[1-10]", true); + rhs = ValueRanges.convertToBitSet(rhs); + temp = ValueRanges.iniFromExpression("4"); + rhs = rhs.minusSelf(temp); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3],[5-10]", true); + rhs = ValueRanges.iniFromExpression("[1-10]", true) + .minusSelf(ValueRanges.iniFromExpression("4")); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3],[6-10]", true); + rhs = ValueRanges.iniFromExpression("[1-10]") + .minusSelf(ValueRanges.iniFromExpression("[4-5]", true)); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3],[10-20]"); + rhs = ValueRanges.iniFromExpression("[1-10]", true) + .minusSelf(ValueRanges.iniFromExpression("[4-10]")); + Assert.assertEquals("Equal operation is wrongly calculated.", false, + rhs.equals(lhs)); + + rhs = ValueRanges.iniFromExpression("[3-5]") + .minusSelf(ValueRanges.iniFromExpression("[1-10]")); + Assert.assertEquals("Equal operation is wrongly calculated.", + ValueRanges.newInstance(), rhs); + + lhs = ValueRanges.iniFromExpression("[10-20]"); + rhs = ValueRanges.iniFromExpression("[8-20]") + .minusSelf(ValueRanges.iniFromExpression("[1-9]")); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3],[10-20]"); + rhs = ValueRanges.iniFromExpression("[1-7],[9-20]") + .minusSelf(ValueRanges.iniFromExpression("[4-9]")); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3],[10-20],[40-80],[95-100]", true); + rhs = ValueRanges.iniFromExpression("[1-100]", true); + // 1. left contains right + rhs = rhs.minusSelf(ValueRanges.iniFromExpression("[4-6],[21-30]")); + Assert.assertEquals("Equal operation is wrongly calculated.", rhs, + ValueRanges.iniFromExpression("[1-3],[7-20],[31-100]")); + + ValueRanges result = ValueRanges.iniFromExpression("[1-3],[7-20],[31-100]"); + Assert.assertEquals("Equal operation is wrongly calculated.", rhs, result); + // 2. has overlap, left is bigger than right + rhs = rhs.minusSelf(ValueRanges.iniFromExpression("[4-9],[93-94]")); + Assert.assertEquals("Equal operation is wrongly calculated.", rhs, + ValueRanges.iniFromExpression("[1-3],[10-20],[31-92],[95-100]")); + // 3. has overlap, left is less than right + rhs = rhs.minusSelf(ValueRanges.iniFromExpression("[81-83],[91-94]")); + Assert.assertEquals("Equal operation is wrongly calculated.", rhs, + ValueRanges + .iniFromExpression("[1-3],[10-20],[31-80],[84-90],[95-100]")); + // 4. right contains left, and no overlap + rhs = rhs.minusSelf(ValueRanges.iniFromExpression("[81-93],94,[101-103]")); + Assert.assertEquals("Equal operation is wrongly calculated.", rhs, + ValueRanges.iniFromExpression("[1-3],[10-20],[31-80],[95-100]")); + + rhs = rhs.minusSelf(ValueRanges.iniFromExpression("[31-39],[81-94]")); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3],[10-20],[40-80],[95-100]"); + rhs = ValueRanges.iniFromExpression("[1-100]"); + // 1. left contains right + temp = ValueRanges.iniFromExpression("[4-6],[21-30]"); + rhs = rhs.minusSelf(temp); + Assert.assertEquals("Equal operation is wrongly calculated.", rhs, + ValueRanges.iniFromExpression("[1-3],[7-20],[31-100]")); + + result = ValueRanges.iniFromExpression("[1-3],[7-20],[31-100]"); + Assert.assertEquals("Equal operation is wrongly calculated.", result, rhs); + + // 2. has overlap, left is bigger than right + temp = ValueRanges.newInstance(); + ValueRanges temp2 = ValueRanges.iniFromExpression("[4-9],[93-94]"); + temp2 = ValueRanges.convertToBitSet(temp2); + temp.setBitSetStore(temp2.getBitSetStore()); + rhs = rhs.minusSelf(temp); + Assert.assertEquals("Equal operation is wrongly calculated.", rhs, + ValueRanges.iniFromExpression("[1-3],[10-20],[31-92],[95-100]")); + + // 3. has overlap, left is less than right + rhs = rhs.minusSelf(ValueRanges.iniFromExpression("[81-83],[91-94]")); + + temp = ValueRanges + .iniFromExpression("[1-3],[10-20],[31-80],[84-90],[95-100]", true); + Assert.assertEquals("Equal operation is wrongly calculated.", temp, rhs); + + // 4. right contains left, and no overlap + temp = ValueRanges.iniFromExpression("[81-93],94,[101-103]", true); + rhs = rhs.minusSelf(temp); + Assert.assertEquals("Equal operation is wrongly calculated.", rhs, + ValueRanges.iniFromExpression("[1-3],[10-20],[31-80],[95-100]")); + + rhs = rhs.minusSelf(ValueRanges.iniFromExpression("[31-39],[81-94]")); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + // Less Test + lhs = ValueRanges.iniFromExpression("[1-3]"); + rhs = ValueRanges.iniFromExpression("[1-10]") + .minusSelf(ValueRanges.iniFromExpression("[3-10]")); + Assert.assertTrue("LessOrEqual operation is wrongly calculated.", + rhs.isLessOrEqual(lhs)); + + lhs = ValueRanges.newInstance(); + rhs = ValueRanges.iniFromExpression("[1-10]") + .minusSelf(ValueRanges.iniFromExpression("[1-10]")); + Assert.assertEquals("Equal operation is wrongly calculated.", lhs, rhs); + + lhs = ValueRanges.iniFromExpression("[1-3],[10-20]"); + rhs = ValueRanges.iniFromExpression("[1-20]") + .minusSelf(ValueRanges.iniFromExpression("[4-5]")); + Assert.assertFalse("LessOrEqual operation is wrongly calculated.", + rhs.isLessOrEqual(lhs)); + + lhs = ValueRanges.iniFromExpression("[1-3],[10-20]"); + rhs = ValueRanges.iniFromExpression("[1-20]") + .minusSelf(ValueRanges.iniFromExpression("[4-10]")); + Assert.assertTrue("LessOrEqual operation is wrongly calculated.", + rhs.isLessOrEqual(lhs)); + + // ToString Test + String lhsString = ValueRanges + .iniFromExpression("[1-3],[10-20],[31-92],[95-100]").toString(); + String rhsString = "[1-3],[10-20],[31-92],[95-100]"; + Assert.assertTrue("ToString is wrongly calculated.", + lhsString.equals(rhsString)); + + lhsString = ValueRanges + .iniFromExpression("[1-3],[10-20],[31-92],[95-100]", true).toString(); + Assert.assertTrue("ToString is wrongly calculated.", + lhsString.equals(rhsString)); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourcesCalculatorWithPorts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourcesCalculatorWithPorts.java new file mode 100644 index 0000000..d9bb1dc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourcesCalculatorWithPorts.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util.resource; + +import java.util.Arrays; +import java.util.Collection; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ValueRanges; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestResourcesCalculatorWithPorts { + private ResourceCalculator resourceCalculator; + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList(new ResourceCalculator[][] { + { new DefaultResourceCalculator() }, + { new DominantResourceCalculator() } }); + } + + public TestResourcesCalculatorWithPorts(ResourceCalculator rs) { + this.resourceCalculator = rs; + } + + @Test(timeout = 10000) + public void testResourceCalculatorCompareMethodWithPorts() { + Resource clusterResource = Resource.newInstance(0, 0); + + // For lhs == rhs + Resource lhs = + Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + Resources.addToWithPorts(lhs, + Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[4-10]"))); + Resource rhs = + Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, true, + false, true, lhs, lhs); + + // For lhs == rhs + lhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + lhs = + Resources + .addWithPorts( + lhs, + Resource.newInstance(1, 1, + ValueRanges.iniFromExpression("[4-10]"))); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, true, + false, true, lhs, lhs); + + // For lhs == rhs + lhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + Resources.subtractFromWithPorts(rhs, + Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[4-10]"))); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, true, + false, true, lhs, lhs); + + // For lhs == rhs + lhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + rhs = + Resources + .subtractWithPorts( + rhs, + Resource.newInstance(1, 1, + ValueRanges.iniFromExpression("[4-10]"))); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, true, + false, true, lhs, lhs); + + // For lhs == rhs + lhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + rhs = Resources.cloneWithPorts(lhs); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, true, + false, true, lhs, lhs); + + if (!(resourceCalculator instanceof DominantResourceCalculator)) { + return; + } + + // dominant resource types + // For lhs > rhs + lhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + Resources.addToWithPorts(lhs, + Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[4-11]"))); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, false, + true, true, lhs, rhs); + + // For lhs > rhs + lhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + lhs = + Resources + .addWithPorts( + lhs, + Resource.newInstance(1, 1, + ValueRanges.iniFromExpression("[4-11]"))); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, false, + true, true, lhs, rhs); + + // For lhs > rhs + lhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + Resources.subtractFromWithPorts(rhs, + Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[3-10]"))); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, false, + true, true, lhs, rhs); + + // For lhs > rhs + lhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + rhs = + Resources + .subtractWithPorts( + rhs, + Resource.newInstance(1, 1, + ValueRanges.iniFromExpression("[3-10]"))); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, false, + true, true, lhs, rhs); + } + + @Test(timeout = 10000) + public void testResourceCalculatorCompareMethodWithPorts2() { + Resource clusterResource = Resource.newInstance(0, 0); + + // For lhs == rhs + Resource lhs = + Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + Resource rhs = + Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[1-3]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, true, + false, true, lhs, lhs); + + // lhs > rhs + lhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + rhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[3-5]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, false, + true, true, lhs, rhs); + + // For lhs < rhs + lhs = Resource.newInstance(0, 0, ValueRanges.iniFromExpression("[3-5]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, true, true, + false, false, rhs, lhs); + + if (!(resourceCalculator instanceof DominantResourceCalculator)) { + return; + } + + // verify for 2 dimensional resources i.e memory and cpu + // dominant resource types + lhs = Resource.newInstance(1, 0, ValueRanges.iniFromExpression("[1-10]")); + rhs = Resource.newInstance(0, 1, ValueRanges.iniFromExpression("[1-10]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, true, + false, true, lhs, lhs); + + lhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, true, + false, true, lhs, rhs); + + lhs = Resource.newInstance(0, 1, ValueRanges.iniFromExpression("[1-10]")); + rhs = Resource.newInstance(1, 0, ValueRanges.iniFromExpression("[1-10]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, true, + false, true, lhs, lhs); + + lhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[2-10]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-5]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, false, + true, true, lhs, rhs); + + lhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-20]")); + rhs = Resource.newInstance(1, 0, ValueRanges.iniFromExpression("[1-10]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, false, + true, true, lhs, rhs); + + lhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-20]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, true, true, + false, false, rhs, lhs); + + lhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-10]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[2-8]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, false, false, + true, true, lhs, rhs); + + lhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[2-10]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-20]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, true, true, + false, false, rhs, lhs); + + lhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[2-10]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-20]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, true, true, + false, false, rhs, lhs); + + lhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[2-10]")); + rhs = Resource.newInstance(1, 1, ValueRanges.iniFromExpression("[1-20]")); + assertResourcesOperationsWithPorts(clusterResource, lhs, rhs, true, true, + false, false, rhs, lhs); + + } + + private void assertResourcesOperationsWithPorts(Resource clusterResource, + Resource lhs, Resource rhs, boolean lessThan, boolean lessThanOrEqual, + boolean greaterThan, boolean greaterThanOrEqual, Resource max, + Resource min) { + + Assert.assertEquals("Less Than operation is wrongly calculated.", lessThan, + Resources.lessThanWithPorts(resourceCalculator, clusterResource, lhs, + rhs)); + + Assert.assertEquals( + "Less Than Or Equal To operation is wrongly calculated.", + lessThanOrEqual, Resources.lessThanOrEqualWithPorts(resourceCalculator, + clusterResource, lhs, rhs)); + + Assert.assertEquals("Greater Than operation is wrongly calculated.", + greaterThan, Resources.greaterThanWithPorts(resourceCalculator, + clusterResource, lhs, rhs)); + + Assert.assertEquals( + "Greater Than Or Equal To operation is wrongly calculated.", + greaterThanOrEqual, Resources.greaterThanOrEqualWithPorts( + resourceCalculator, clusterResource, lhs, rhs)); + + Assert.assertEquals("Max(value) Operation wrongly calculated.", max, + Resources.maxWithPorts(resourceCalculator, clusterResource, lhs, rhs)); + + Assert.assertEquals("Min(value) operation is wrongly calculated.", min, + Resources.minWithPorts(resourceCalculator, clusterResource, lhs, rhs)); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index fc30a80..5dec4ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.util.Records; public abstract class RegisterNodeManagerRequest { @@ -50,6 +51,15 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, List containerStatuses, List runningApplications, Set nodeLabels, Resource physicalResource) { + return newInstance(nodeId, httpPort, resource, nodeManagerVersionId, + containerStatuses, runningApplications, nodeLabels, physicalResource, null); + } + + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, + int httpPort, Resource resource, String nodeManagerVersionId, + List containerStatuses, + List runningApplications, Set nodeLabels, + Resource physicalResource, ValueRanges ports) { RegisterNodeManagerRequest request = Records.newRecord(RegisterNodeManagerRequest.class); request.setHttpPort(httpPort); @@ -60,6 +70,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId, request.setRunningApplications(runningApplications); request.setNodeLabels(nodeLabels); request.setPhysicalResource(physicalResource); + request.setLocalUsedPortsSnapshot(ports); return request; } @@ -112,4 +123,8 @@ public abstract void setRunningApplications( * @param physicalResource Physical resources in the node. */ public abstract void setPhysicalResource(Resource physicalResource); + + public abstract void setLocalUsedPortsSnapshot(ValueRanges ports); + + public abstract ValueRanges getLocalUsedPortsSnapshot() ; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index eda06d0..afe2ec3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -29,15 +29,22 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ValueRangesPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangesProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder; @@ -56,6 +63,7 @@ private List containerStatuses = null; private List runningApplications = null; private Set labels = null; + private ValueRanges localUsedPortsSnapshot = null; /** Physical resources in the node. */ private Resource physicalResource = null; @@ -100,6 +108,10 @@ private synchronized void mergeLocalToBuilder() { if (this.physicalResource != null) { builder.setPhysicalResource(convertToProtoFormat(this.physicalResource)); } + if (this.localUsedPortsSnapshot != null) { + builder + .setLocalUsedPortsSnapshot(convertToProtoFormat(this.localUsedPortsSnapshot)); + } } private synchronized void addNMContainerStatusesToProto() { @@ -358,6 +370,27 @@ private synchronized void initNodeLabels() { } } + @Override + public synchronized ValueRanges getLocalUsedPortsSnapshot() { + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.localUsedPortsSnapshot != null) { + return this.localUsedPortsSnapshot; + } + if (!p.hasLocalUsedPortsSnapshot()) { + return null; + } + this.localUsedPortsSnapshot = + convertFromProtoFormat(p.getLocalUsedPortsSnapshot()); + return this.localUsedPortsSnapshot; + } + + @Override + public synchronized void setLocalUsedPortsSnapshot(ValueRanges ports) { + maybeInitBuilder(); + builder.clearLocalUsedPortsSnapshot(); + localUsedPortsSnapshot = ports; + } + private static NodeLabelPBImpl convertFromProtoFormat(NodeLabelProto p) { return new NodeLabelPBImpl(p); } @@ -400,4 +433,12 @@ private static NMContainerStatusProto convertToProtoFormat( NMContainerStatus c) { return ((NMContainerStatusPBImpl)c).getProto(); } -} \ No newline at end of file + + private static ValueRanges convertFromProtoFormat(ValueRangesProto proto) { + return new ValueRangesPBImpl(proto); + } + + private ValueRangesProto convertToProtoFormat(ValueRanges m) { + return ((ValueRangesPBImpl) m).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 440cd0a..bde5e3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.util.Records; /** @@ -132,4 +133,8 @@ public abstract void setIncreasedContainers( @Unstable public abstract void setOpportunisticContainersStatus( OpportunisticContainersStatus opportunisticContainersStatus); + + public abstract ValueRanges getLocalUsedPortsSnapshot(); + + public abstract void setLocalUsedPortsSnapshot(ValueRanges ports); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8aebc6f..2adcee3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -28,16 +28,19 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ValueRangesPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ValueRangesProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; @@ -57,6 +60,7 @@ private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; private List increasedContainers = null; + private ValueRanges localUsedPortsSnapshot = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -90,6 +94,11 @@ private synchronized void mergeLocalToBuilder() { if (this.increasedContainers != null) { addIncreasedContainersToProto(); } + + if (this.localUsedPortsSnapshot != null) { + builder + .setLocalUsedPortsSnapshot(convertToProtoFormat(this.localUsedPortsSnapshot)); + } } private synchronized void mergeLocalToProto() { @@ -487,4 +496,33 @@ private ContainerProto convertToProtoFormat( Container c) { return ((ContainerPBImpl)c).getProto(); } + + @Override + public ValueRanges getLocalUsedPortsSnapshot() { + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + if (this.localUsedPortsSnapshot != null) { + return this.localUsedPortsSnapshot; + } + if (!p.hasLocalUsedPortsSnapshot()) { + return null; + } + this.localUsedPortsSnapshot = + convertFromProtoFormat(p.getLocalUsedPortsSnapshot()); + return this.localUsedPortsSnapshot; + } + + @Override + public void setLocalUsedPortsSnapshot(ValueRanges ports) { + maybeInitBuilder(); + builder.clearLocalUsedPortsSnapshot(); + localUsedPortsSnapshot = ports; + } + + private static ValueRanges convertFromProtoFormat(ValueRangesProto proto) { + return new ValueRangesPBImpl(proto); + } + + private ValueRangesProto convertToProtoFormat(ValueRanges m) { + return ((ValueRangesPBImpl) m).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 98b172d..28f1cd5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -40,6 +40,7 @@ message NodeStatusProto { optional ResourceUtilizationProto node_utilization = 7; repeated ContainerProto increased_containers = 8; optional OpportunisticContainersStatusProto opportunistic_containers_status = 9; + optional ValueRangesProto local_used_ports_snapshot = 10; } message OpportunisticContainersStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 4e05fba..1f6fe72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -64,6 +64,7 @@ message RegisterNodeManagerRequestProto { repeated ApplicationIdProto runningApplications = 7; optional NodeLabelsProto nodeLabels = 8; optional ResourceProto physicalResource = 9; + optional ValueRangesProto local_used_ports_snapshot = 240; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index ade42e3..f0c5ba6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -89,6 +90,7 @@ import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; +import org.apache.hadoop.yarn.util.PortsInfo; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -149,6 +151,14 @@ private NMNodeLabelsHandler nodeLabelsHandler; private final NodeLabelsProvider nodeLabelsProvider; + private boolean enablePortsAsResource; + private boolean enablePortsBitSetStore; + /** + * this parameter is circle controller for updating local allocated ports + * info, since the ports info is big. we can control the update frequency to + * have balance with cluster scale and ports info's accuracy + */ + private int numOfRoundsToUpdatePorts; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -184,7 +194,26 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.info("Nodemanager resources: memory set to " + memoryMb + "MB."); LOG.info("Nodemanager resources: vcores set to " + virtualCores + "."); - this.totalResource = Resource.newInstance(memoryMb, virtualCores); + numOfRoundsToUpdatePorts = + conf.getInt(YarnConfiguration.NM_PORTS_UPDATE_ROUNDS, + YarnConfiguration.DEFAULT_NM_PORTS_UPDATE_ROUNDS); + + enablePortsAsResource = + conf.getBoolean(YarnConfiguration.PORTS_AS_RESOURCE_ENABLE, + YarnConfiguration.DEFAULT_PORTS_AS_RESOURCE_ENABLE); + + enablePortsBitSetStore = + conf.getBoolean(YarnConfiguration.PORTS_BITSET_STORE_ENABLE, + YarnConfiguration.DEFAULT_PORTS_BITSET_STORE_ENABLE); + + ValueRanges ports = null; + if (enablePortsAsResource) { + ports = ValueRanges.iniFromExpression(conf.get(YarnConfiguration.NM_PORTS, + YarnConfiguration.DEFAULT_NM_PORTS), enablePortsBitSetStore); + } + + + this.totalResource = Resource.newInstance(memoryMb, virtualCores, ports); metrics.addResource(totalResource); // Get actual node physical resources @@ -352,10 +381,14 @@ protected void registerWithRM() // during RM recovery synchronized (this.context) { List containerReports = getNMContainerStatuses(); + ValueRanges ports = null; + if (enablePortsAsResource) { + ports = new PortsInfo().GetAllocatedPorts(enablePortsBitSetStore); + } RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels, physicalResource); + nodeLabels, physicalResource, ports); if (containerReports != null) { LOG.info("Registering with RM using containers :" + containerReports); } @@ -473,7 +506,7 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization, nodeUtilization, increasedContainers); + containersUtilization, nodeUtilization, increasedContainers); nodeStatus.setOpportunisticContainersStatus( getOpportunisticContainersStatus()); @@ -759,7 +792,6 @@ public long getRMIdentifier() { } protected void startStatusUpdater() { - statusUpdaterRunnable = new StatusUpdaterRunnable(); statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); @@ -1031,9 +1063,22 @@ public void run() { // Send heartbeat try { NodeHeartbeatResponse response = null; + ValueRanges lastUpdatePorts = null; + int rounds = 0; Set nodeLabelsForHeartbeat = nodeLabelsHandler.getNodeLabelsForHeartbeat(); NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID); + if (enablePortsAsResource) { + if (rounds++ >= numOfRoundsToUpdatePorts) { + ValueRanges ports = + new PortsInfo().GetAllocatedPorts(enablePortsBitSetStore); + if (lastUpdatePorts == null || !lastUpdatePorts.equals(ports)) { + nodeStatus.setLocalUsedPortsSnapshot(ports); + lastUpdatePorts = ports; + } + rounds = 0; + } + } NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, NodeStatusUpdaterImpl.this.context diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 0b599a8..732fc84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.api.records.SignalContainerCommand; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; @@ -84,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; +import org.apache.hadoop.yarn.util.PortsInfo; import com.google.common.annotations.VisibleForTesting; @@ -156,6 +159,23 @@ public static String expandEnvironment(String var, return var; } + private boolean validatePortsRequest(Resource resource) { + if (resource == null || resource.getPorts() == null + || resource.getPorts().getRangesCount() == 0) { + return true; // no ports request + } + ValueRanges allocatedPorts = new PortsInfo().GetAllocatedPorts(false); + ValueRanges requestPorts = resource.getPorts(); + if (requestPorts.equals(requestPorts.minusSelf(allocatedPorts))) { + return true; + } else { + LOG.info("no available ports, allocated ports:" + + allocatedPorts.toString() + ", required:" + requestPorts.toString()); + return false; + } + } + + @Override @SuppressWarnings("unchecked") // dispatcher not typed public Integer call() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index e6f2bb2..4787c47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -108,6 +109,8 @@ private Server server; private InetSocketAddress resourceTrackerAddress; private String minimumNodeManagerVersion; + private boolean enablePortsAsResource; + private boolean enablePortsBitSetStore; private int minAllocMb; private int minAllocVcores; @@ -143,6 +146,13 @@ protected void serviceInit(Configuration conf) throws Exception { YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS, YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT); + enablePortsAsResource = + conf.getBoolean(YarnConfiguration.PORTS_AS_RESOURCE_ENABLE, + YarnConfiguration.DEFAULT_PORTS_AS_RESOURCE_ENABLE); + enablePortsBitSetStore = + conf.getBoolean(YarnConfiguration.PORTS_BITSET_STORE_ENABLE, + YarnConfiguration.DEFAULT_PORTS_BITSET_STORE_ENABLE); + RackResolver.init(conf); nextHeartBeatInterval = conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, @@ -352,6 +362,10 @@ public RegisterNodeManagerResponse registerNodeManager( response.setNodeAction(NodeAction.SHUTDOWN); return response; } + // reset illegal resource report + if (!this.enablePortsAsResource) { + capability.setPorts(null); + } // check if node's capacity is load from dynamic-resources.xml String nid = nodeId.toString(); @@ -386,8 +400,53 @@ public RegisterNodeManagerResponse registerNodeManager( response.setNMTokenMasterKey(nmTokenSecretManager .getCurrentKey()); - RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, - resolve(host), capability, nodeManagerVersion, physicalResource); + ValueRanges localUsedPorts = null; + if (this.enablePortsAsResource) { + localUsedPorts = request.getLocalUsedPortsSnapshot(); + if (this.enablePortsBitSetStore + && request.getLocalUsedPortsSnapshot() != null) { + localUsedPorts = + ValueRanges.convertToBitSet(request.getLocalUsedPortsSnapshot()); + } + } + RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort, + resolve(host), capability, nodeManagerVersion, physicalResource, localUsedPorts); + if (this.enablePortsAsResource && this.enablePortsBitSetStore) { + if (rmNode.getTotalCapability().getPorts() != null) { + ValueRanges totalPorts = + ValueRanges.convertToBitSet(rmNode.getTotalCapability().getPorts()); + rmNode.getTotalCapability().setPorts(totalPorts); + } + if (rmNode.getContainerAllocatedPorts() == null) { + rmNode.setContainerAllocatedPorts(ValueRanges.newInstance()); + rmNode.getContainerAllocatedPorts().setByteStoreEnable(true); + } + ValueRanges containerAllocatedPorts = + ValueRanges.convertToBitSet(rmNode.getContainerAllocatedPorts()); + rmNode.setContainerAllocatedPorts(containerAllocatedPorts); + + if (rmNode.getLocalUsedPortsSnapshot() != null) { + ValueRanges localUsedPortsSnapshot = + ValueRanges.convertToBitSet(rmNode.getLocalUsedPortsSnapshot()); + rmNode.setLocalUsedPortsSnapshot(localUsedPortsSnapshot); + } + } + + if (this.enablePortsAsResource) { + rmNode.setAvailablePorts( + getAvailablePorts( + rmNode.getTotalCapability().getPorts(), + rmNode.getContainerAllocatedPorts(), + rmNode.getLocalUsedPortsSnapshot())); + if (this.enablePortsBitSetStore && rmNode.getAvailablePorts() != null) { + rmNode.getAvailablePorts().setByteStoreEnable(true); + ValueRanges availablePorts = + ValueRanges.convertToBitSet(rmNode.getAvailablePorts()); + rmNode.setAvailablePorts(availablePorts); + } + } + + RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); if (oldNode == null) { @@ -607,7 +666,28 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) nodeHeartBeatResponse.setContainerQueuingLimit( this.rmContext.getNodeManagerQueueLimitCalculator() .createContainerQueuingLimit()); + } + // 8. Update the local used ports snapshot + if (this.enablePortsAsResource) { + ValueRanges ports = remoteNodeStatus.getLocalUsedPortsSnapshot(); + if (ports != null) { + rmNode.setLocalUsedPortsSnapshot(ports); + if (this.enablePortsBitSetStore) { + ValueRanges LocalUsedPorts = + ValueRanges.convertToBitSet(rmNode.getLocalUsedPortsSnapshot()); + rmNode.setLocalUsedPortsSnapshot(LocalUsedPorts); + } + ValueRanges availablePorts = null; + if (rmNode.getTotalCapability().getPorts() != null) { + availablePorts = + getAvailablePorts(rmNode.getTotalCapability().getPorts(), + rmNode.getContainerAllocatedPorts(), + rmNode.getLocalUsedPortsSnapshot()); + } + rmNode.setAvailablePorts(availablePorts); + } } + return nodeHeartBeatResponse; } @@ -631,6 +711,14 @@ private void setAppCollectorsMapToResponse( response.setAppCollectorsMap(liveAppCollectorsMap); } + private static ValueRanges getAvailablePorts(ValueRanges total, + ValueRanges allocated, ValueRanges localUsed) { + if (total == null) { + return null; + } + return total.minusSelf(allocated).minusSelf(localUsed); + } + private void updateAppCollectorsMap(NodeHeartbeatRequest request) { Map registeredCollectorsMap = request.getRegisteredCollectors(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index ab15c95..8602412 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; @@ -187,4 +188,46 @@ void updateNodeHeartbeatResponseForUpdatedContainers( * @return the decommissioning timeout in second. */ Integer getDecommissioningTimeout(); + + /** + * Get local used ports snapshot. + * + * @return ports range. + */ + public ValueRanges getLocalUsedPortsSnapshot(); + + /** + * update {@link ValueRanges} local used ports snapshot. + * + * @param use {@link ValueRanges} to update + */ + public void setLocalUsedPortsSnapshot(ValueRanges ports); + + /** + * Get available ports. + * + * @return ports range. + */ + public ValueRanges getAvailablePorts(); + + /** + * update {@link ValueRanges} available ports. + * + * @param use {@link ValueRanges} to update + */ + public void setAvailablePorts(ValueRanges ports); + + /** + * Get container allocated ports. + * + * @return ports range. + */ + public ValueRanges getContainerAllocatedPorts(); + + /** + * update {@link ValueRanges} container allocated ports. + * + * @param use {@link ValueRanges} to update + */ + public void setContainerAllocatedPorts(ValueRanges ports); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 1bdaa98..4a0671b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -140,6 +141,11 @@ private OpportunisticContainersStatus opportunisticContainersStatus; private final ContainerAllocationExpirer containerAllocationExpirer; + /** Port ranges used in the host. */ + private ValueRanges localUsedPortsSnapshot = null; + private ValueRanges containerAllocatedPorts = null; + private ValueRanges availabelPorts = null; + /* set of containers that have just launched */ private final Set launchedContainers = new HashSet(); @@ -365,6 +371,13 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion, Resource physResource) { + this(nodeId, context, hostName, cmPort, httpPort, node, capability, + nodeManagerVersion, physResource, null); + } + + public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, + int cmPort, int httpPort, Node node, Resource capability, + String nodeManagerVersion, Resource physResource, ValueRanges ports) { this.nodeId = nodeId; this.context = context; this.hostName = hostName; @@ -391,6 +404,8 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName, this.nodeUpdateQueue = new ConcurrentLinkedQueue(); this.containerAllocationExpirer = context.getContainerAllocationExpirer(); + + this.localUsedPortsSnapshot = ports; } @Override @@ -1477,6 +1492,27 @@ private void handleLogAggregationStatus( } @Override + public ValueRanges getAvailablePorts() { + return availabelPorts; + } + + @Override + public void setAvailablePorts(ValueRanges ports) { + this.availabelPorts = ports; + } + + @Override + public ValueRanges getContainerAllocatedPorts() { + return containerAllocatedPorts; + } + + @Override + public void setContainerAllocatedPorts(ValueRanges ports) { + this.containerAllocatedPorts = ports; + } + + + @Override public List pullNewlyIncreasedContainers() { try { writeLock.lock(); @@ -1534,4 +1570,14 @@ public void setUntrackedTimeStamp(long ts) { public Integer getDecommissioningTimeout() { return decommissioningTimeout; } + + @Override + public ValueRanges getLocalUsedPortsSnapshot() { + return this.localUsedPortsSnapshot; + } + + @Override + public void setLocalUsedPortsSnapshot(ValueRanges ports) { + this.localUsedPortsSnapshot = ports; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 272537c..70dbc5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -19,13 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.ArrayList; -import java.util.LinkedList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -36,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; @@ -66,6 +67,7 @@ ResourceUtilization.newInstance(0, 0, 0f); private volatile ResourceUtilization nodeUtilization = ResourceUtilization.newInstance(0, 0, 0f); + private Resource allocatedOpportunistic = Resources.clone(Resources.none()); /* set of containers that are allocated containers */ private final Map launchedContainers = @@ -79,8 +81,14 @@ public SchedulerNode(RMNode node, boolean usePortForNodeName, Set labels) { this.rmNode = node; - this.unallocatedResource = Resources.clone(node.getTotalCapability()); - this.totalResource = Resources.clone(node.getTotalCapability()); + Resource capacity = node.getTotalCapability(); + if (capacity != null && capacity.getPorts() != null) { + this.unallocatedResource = Resources.cloneWithPorts(capacity); + this.totalResource = Resources.cloneWithPorts(capacity); + } else { + this.unallocatedResource = Resources.clone(capacity); + this.totalResource = Resources.clone(capacity); + } if (usePortForNodeName) { nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort(); } else { @@ -188,6 +196,10 @@ public synchronized Resource getUnallocatedResource() { return this.unallocatedResource; } + public synchronized ValueRanges getAvailablePorts() { + return this.rmNode.getAvailablePorts(); + } + /** * Get allocated resources on the node. * @return Allocated resources on the node @@ -266,7 +278,65 @@ public synchronized void containerStarted(ContainerId containerId) { info.launchedOnNode = true; } } + + private ValueRanges calculateAvailablePorts() { + if (rmNode.getTotalCapability().getPorts() == null) { + return null; + } + return rmNode.getTotalCapability().getPorts() + .minusSelf(rmNode.getContainerAllocatedPorts()) + .minusSelf(rmNode.getLocalUsedPortsSnapshot()); + } + + /** + * Update allocation based stats. + * @param resource - Resource allocated/released + * @param increase - whether resources are allocated or released + */ + private synchronized void updateResourceAllocation( + Resource resource, boolean increase, boolean opportunistic) { + if (resource == null) { + LOG.error("Invalid update on resource allocation " + + rmNode.getNodeAddress()); + return; + } + if (increase) { + if (opportunistic) { + Resources.addTo(allocatedOpportunistic, resource); + } else { + if (resource.getPorts() != null) { + Resources.addToWithPorts(allocatedResource, resource); + updateAllocatedPorts(); + } else { + Resources.addTo(allocatedResource, resource); + } + } + } else { + if (opportunistic) { + Resources.subtractFrom(allocatedOpportunistic, resource); + } else { + if (resource.getPorts() != null) { + Resources.subtractFromWithPorts(allocatedResource, resource); + updateAllocatedPorts(); + } else { + Resources.subtractFrom(allocatedResource, resource); + } + } + } + } + private void updateAllocatedPorts() { + rmNode.setContainerAllocatedPorts(allocatedResource.getPorts()); + + if (rmNode.getTotalCapability().getPorts() != null + && rmNode.getTotalCapability().getPorts().getBitSetStore() != null) { + ValueRanges containerAllocatedPorts = + ValueRanges.convertToBitSet(rmNode.getContainerAllocatedPorts()); + rmNode.setContainerAllocatedPorts(containerAllocatedPorts); + } + rmNode.setAvailablePorts(calculateAvailablePorts()); + } + /** * Add unallocated resources to the node. This is used when unallocating a * container. @@ -278,8 +348,16 @@ private synchronized void addUnallocatedResource(Resource resource) { + rmNode.getNodeAddress()); return; } - Resources.addTo(unallocatedResource, resource); - Resources.subtractFrom(allocatedResource, resource); + + if (resource.getPorts() != null) { + Resources.addToWithPorts(unallocatedResource, resource); + Resources.subtractFromWithPorts(allocatedResource, resource); + rmNode.setContainerAllocatedPorts(allocatedResource.getPorts()); + rmNode.setAvailablePorts(calculateAvailablePorts()); + } else { + Resources.addTo(unallocatedResource, resource); + Resources.subtractFrom(allocatedResource, resource); + } } /** @@ -294,8 +372,15 @@ public synchronized void deductUnallocatedResource(Resource resource) { + rmNode.getNodeAddress()); return; } - Resources.subtractFrom(unallocatedResource, resource); - Resources.addTo(allocatedResource, resource); + if (resource.getPorts() != null) { + Resources.subtractFromWithPorts(unallocatedResource, resource); + Resources.addToWithPorts(allocatedResource, resource); + rmNode.setContainerAllocatedPorts(allocatedResource.getPorts()); + rmNode.setAvailablePorts(calculateAvailablePorts()); + } else { + Resources.subtractFrom(unallocatedResource, resource); + Resources.addTo(allocatedResource, resource); + } } /** @@ -471,4 +556,4 @@ public ContainerInfo(RMContainer container, boolean launchedOnNode) { this.launchedOnNode = launchedOnNode; } } -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index d15431e..7c3d3fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -141,7 +141,7 @@ public LeafQueue(CapacitySchedulerContext cs, // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); - + if(LOG.isDebugEnabled()) { LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index f753d31..6783a90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -30,30 +30,29 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ValueRanges; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; - import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; -import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; - import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -64,11 +63,16 @@ */ public class RegularContainerAllocator extends AbstractContainerAllocator { private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class); + private boolean enablePortsAsResource; public RegularContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc, RMContext rmContext, ActivitiesManager activitiesManager) { super(application, rc, rmContext, activitiesManager); + this.enablePortsAsResource = + rmContext.getYarnConfiguration().getBoolean( + YarnConfiguration.PORTS_AS_RESOURCE_ENABLE, + YarnConfiguration.DEFAULT_PORTS_AS_RESOURCE_ENABLE); } private boolean checkHeadroom(Resource clusterResource, @@ -484,6 +488,19 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, ActivityDiagnosticConstant.PRIORITY_SKIPPED); return ContainerAllocation.PRIORITY_SKIPPED; } + + private boolean validatePortsAvailable(ValueRanges availablePorts, + ValueRanges requiredPorts) { + if (availablePorts == null || requiredPorts == null) { + // no ports request + return true; + } + if (requiredPorts.isLessOrEqual(availablePorts)) { + return true; + } else { + return false; + } + } private ContainerAllocation assignContainer(Resource clusterResource, FiCaSchedulerNode node, SchedulerRequestKey schedulerKey, @@ -516,6 +533,14 @@ private ContainerAllocation assignContainer(Resource clusterResource, boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( schedulerKey, capability); + + if (enablePortsAsResource + && !validatePortsAvailable( + node.getAvailablePorts(), capability.getPorts())) { + LOG.info("no available ports, current available:" + + node.getAvailablePorts() + ", required:" + capability.getPorts()); + return ContainerAllocation.LOCALITY_SKIPPED; + } // Can we allocate a container on this node? long availableContainers = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 7f58711..37c8939 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; @@ -284,6 +285,34 @@ public Integer getDecommissioningTimeout() { public Resource getPhysicalResource() { return this.physicalResource; } + + @Override + public ValueRanges getAvailablePorts() { + return null; + } + + @Override + public void setAvailablePorts(ValueRanges ports) { + } + + @Override + public ValueRanges getContainerAllocatedPorts() { + return null; + } + + @Override + public void setContainerAllocatedPorts(ValueRanges ports) { + } + + @Override + public ValueRanges getLocalUsedPortsSnapshot() { + return null; + } + + @Override + public void setLocalUsedPortsSnapshot(ValueRanges port) { + } + }; private static RMNode buildRMNode(int rack, final Resource perNode, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestPortsAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestPortsAllocation.java new file mode 100644 index 0000000..1057e0b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestPortsAllocation.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ValueRanges; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestPortsAllocation { + + private static final Log LOG = LogFactory.getLog(TestLeafQueue.class); + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + RMContext rmContext; + RMContext spyRMContext; + ResourceRequest amResourceRequest; + CapacityScheduler cs; + CapacitySchedulerConfiguration csConf; + CapacitySchedulerContext csContext; + + CSQueue root; + Map queues = new HashMap(); + + final static int GB = 1024; + final static String DEFAULT_RACK = "/default"; + + private final ResourceCalculator resourceCalculator = + new DominantResourceCalculator(); + + @Before + public void setUp() throws Exception { + CapacityScheduler spyCs = new CapacityScheduler(); + cs = spy(spyCs); + rmContext = TestUtils.getMockRMContext(); + spyRMContext = spy(rmContext); + + ConcurrentMap spyApps = + spy(new ConcurrentHashMap()); + RMApp rmApp = mock(RMApp.class); + when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null); + amResourceRequest = mock(ResourceRequest.class); + when(amResourceRequest.getCapability()).thenReturn( + Resources.createResource(0, 0)); + when(rmApp.getAMResourceRequests()).thenReturn( + Collections.singletonList(amResourceRequest)); + Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any()); + when(spyRMContext.getRMApps()).thenReturn(spyApps); + + csConf = + new CapacitySchedulerConfiguration(); + csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); + csConf.setBoolean(YarnConfiguration.PORTS_AS_RESOURCE_ENABLE, true); + final String newRoot = "root" + System.currentTimeMillis(); + setupQueueConfiguration(csConf, newRoot); + YarnConfiguration conf = new YarnConfiguration(); + cs.setConf(conf); + + csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()). + thenReturn(Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()). + thenReturn(Resources.createResource(16*GB, 32)); + when(csContext.getClusterResource()). + thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); + when(csContext.getResourceCalculator()). + thenReturn(resourceCalculator); + when(csContext.getRMContext()).thenReturn(rmContext); + RMContainerTokenSecretManager containerTokenSecretManager = + new RMContainerTokenSecretManager(conf); + containerTokenSecretManager.rollMasterKey(); + when(csContext.getContainerTokenSecretManager()).thenReturn( + containerTokenSecretManager); + + root = + CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, + queues, queues, + TestUtils.spyHook); + + cs.setRMContext(spyRMContext); + cs.init(csConf); + cs.start(); + + when(spyRMContext.getScheduler()).thenReturn(cs); + when(spyRMContext.getYarnConfiguration()) + .thenReturn(new YarnConfiguration()); + when(cs.getNumClusterNodes()).thenReturn(3); + } + + private static final String A = "a"; + private static final String B = "b"; + private static final String C = "c"; + private static final String C1 = "c1"; + private static final String D = "d"; + private static final String E = "e"; + private void setupQueueConfiguration( + CapacitySchedulerConfiguration conf, + final String newRoot) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {newRoot}); + conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100); + conf.setAcl(CapacitySchedulerConfiguration.ROOT, + QueueACL.SUBMIT_APPLICATIONS, " "); + + final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot; + conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E}); + conf.setCapacity(Q_newRoot, 100); + conf.setMaximumCapacity(Q_newRoot, 100); + conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); + + final String Q_A = Q_newRoot + "." + A; + conf.setCapacity(Q_A, 8.5f); + conf.setMaximumCapacity(Q_A, 20); + conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); + + final String Q_B = Q_newRoot + "." + B; + conf.setCapacity(Q_B, 80); + conf.setMaximumCapacity(Q_B, 99); + conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*"); + + final String Q_C = Q_newRoot + "." + C; + conf.setCapacity(Q_C, 1.5f); + conf.setMaximumCapacity(Q_C, 10); + conf.setAcl(Q_C, QueueACL.SUBMIT_APPLICATIONS, " "); + + conf.setQueues(Q_C, new String[] {C1}); + + final String Q_C1 = Q_C + "." + C1; + conf.setCapacity(Q_C1, 100); + + final String Q_D = Q_newRoot + "." + D; + conf.setCapacity(Q_D, 9); + conf.setMaximumCapacity(Q_D, 11); + conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d"); + + final String Q_E = Q_newRoot + "." + E; + conf.setCapacity(Q_E, 1); + conf.setMaximumCapacity(Q_E, 1); + conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e"); + + } + + static LeafQueue stubLeafQueue(LeafQueue queue) { + // Mock some methods for ease in these unit tests + + // 1. Stub out LeafQueue.parent.completedContainer + CSQueue parent = queue.getParent(); + doNothing().when(parent).completedContainer( + any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), + any(RMContainer.class), any(ContainerStatus.class), + any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); + + // Stub out parent queue's accept and apply. + doReturn(true).when(parent).accept(any(Resource.class), + any(ResourceCommitRequest.class)); + doNothing().when(parent).apply(any(Resource.class), + any(ResourceCommitRequest.class)); + + return queue; + } + + public boolean hasQueueACL(List aclInfos, QueueACL acl) { + for (QueueUserACLInfo aclInfo : aclInfos) { + if (aclInfo.getUserAcls().contains(acl)) { + return true; + } + } + return false; + } + + @Test + public void testFifoAssignmentWithPorts() throws Exception { + + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + a.setOrderingPolicy(new FifoOrderingPolicy()); + + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = + TestUtils.getMockNodeForPortsCaculate(host_0_0, rack_0, 0, 16 * GB, 10, + ValueRanges.iniFromExpression("[1-100]"),csConf); + + final int numNodes = 4; + Resource clusterResource = + Resources.createResource(numNodes * (16 * GB), numNodes * 16, + ValueRanges.iniFromExpression("[1-100]")); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user_0 = "user_0"; + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_1, user_0); + + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + List app_1_requests_0 = new ArrayList(); + + app_0_requests_0.clear(); + app_0_requests_0.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 2 * GB, ValueRanges.iniFromExpression("[1-10]"), 1, true, priority, + recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, ValueRanges.iniFromExpression("[11-15]"), 1, true, priority, + recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + a.assignContainers(clusterResource, node_0_0, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + //check the available port + Assert.assertEquals(ValueRanges.iniFromExpression("[11-100]"), node_0_0.getAvailablePorts()); + a.assignContainers(clusterResource, node_0_0, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(1 * GB, app_1.getCurrentConsumption().getMemory()); + //check the available port + Assert.assertEquals(ValueRanges.iniFromExpression("[16-100]"), node_0_0.getAvailablePorts()); + + + app_0_requests_0.clear(); + app_0_requests_0.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 1, true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 1, true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + // Even thought it already has more resources, app_0 will still get + // assigned first + a.assignContainers(clusterResource, node_0_0, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(3 * GB, app_0.getCurrentConsumption().getMemory()); + Assert.assertEquals(1 * GB, app_1.getCurrentConsumption().getMemory()); + //check the available port + Assert.assertEquals(ValueRanges.iniFromExpression("[16-100]"), node_0_0.getAvailablePorts()); + + // and only then will app_1 + a.assignContainers(clusterResource, node_0_0, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + Assert.assertEquals(2 * GB, app_1.getCurrentConsumption().getMemory()); + //check the available port + Assert.assertEquals(ValueRanges.iniFromExpression("[16-100]"), node_0_0.getAvailablePorts()); + } + + @Test + public void testFifoAssignmentWithPorts2() throws Exception { + + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + a.setOrderingPolicy(new FifoOrderingPolicy()); + + String host_0_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0_0 = + TestUtils.getMockNodeForPortsCaculate(host_0_0, rack_0, 0, 16 * GB, 10, + ValueRanges.iniFromExpression("[1-100]"), csConf); + + final int numNodes = 4; + Resource clusterResource = + Resources.createResource(numNodes * (16 * GB), numNodes * 16, + ValueRanges.iniFromExpression("[1-100]")); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + String user_0 = "user_0"; + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext)); + a.submitApplicationAttempt(app_1, user_0); + + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + List app_1_requests_0 = new ArrayList(); + + app_0_requests_0.clear(); + app_0_requests_0.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 2 * GB, ValueRanges.iniFromExpression("[1-10]"), 1, true, priority, + recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + app_1_requests_0.clear(); + app_1_requests_0.add(TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, ValueRanges.iniFromExpression("[8-15]"), 1, true, priority, + recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + a.assignContainers(clusterResource, node_0_0, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // app_0 should allocate successfully + Assert.assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + + a.assignContainers(clusterResource, node_0_0, new ResourceLimits( + clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + // app_1 should not be allocated as ports conflict + Assert.assertEquals(0, app_1.getCurrentConsumption().getMemory()); + // check the available port + Assert.assertEquals(ValueRanges.iniFromExpression("[11-100]"), + node_0_0.getAvailablePorts()); + + } + + + @After + public void tearDown() throws Exception { + if (cs != null) { + cs.stop(); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index 4bc5127..c0e2cac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ValueRanges; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; @@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; @@ -65,6 +67,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.apache.hadoop.net.Node; import com.google.common.collect.Sets; import org.apache.hadoop.yarn.event.Event; @@ -184,6 +187,22 @@ public static ResourceRequest createResourceRequest(String resourceName, return request; } + public static ResourceRequest createResourceRequest(String resourceName, + int memory, ValueRanges ports, int numContainers, boolean relaxLocality, + Priority priority, RecordFactory recordFactory) { + ResourceRequest request = + recordFactory.newRecordInstance(ResourceRequest.class); + Resource capability = Resources.createResource(memory, 1, ports); + + request.setNumContainers(numContainers); + request.setResourceName(resourceName); + request.setCapability(capability); + request.setRelaxLocality(relaxLocality); + request.setPriority(priority); + request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + return request; + } + public static ResourceRequest createResourceRequest( String resourceName, int memory, int numContainers, boolean relaxLocality, Priority priority, @@ -225,6 +244,31 @@ public static FiCaSchedulerNode getMockNode(String host, String rack, return node; } + public static FiCaSchedulerNode getMockNodeForPortsCaculate(String host, + String rack, int port, int mem, int vCores, ValueRanges ports, + Configuration conf) { + NodeId nodeId = mock(NodeId.class); + when(nodeId.getHost()).thenReturn(host); + when(nodeId.getPort()).thenReturn(port); + RMContext rmContext = mock(RMContext.class); + when(rmContext.getYarnConfiguration()).thenReturn(conf); + Node mockNode = mock(Node.class); + when(mockNode.getNetworkLocation()).thenReturn(rack); + RMNode rmNode = + new RMNodeImpl( + nodeId, + rmContext, + host, + 0, + 0, + mockNode, + Resources.createResource(mem, vCores, ports), ""); + FiCaSchedulerNode node = spy(new FiCaSchedulerNode(rmNode, false)); + LOG.info("node = " + host + " avail=" + node.getUnallocatedResource()); + return node; + } + + @SuppressWarnings("deprecation") public static ContainerId getMockContainerId(FiCaSchedulerApp application) { ContainerId containerId = mock(ContainerId.class);