commit eb76527e7901a5386c4aefd6cf557f620bdec0b2 Author: Wangda Tan Date: Sat Oct 1 09:57:13 2016 -0700 changes for ResourceUsage diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java index 0d31b6f..c2dee83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java @@ -171,4 +171,13 @@ public boolean equals(Object obj) { public String toString() { return ""; } + + @Override + public int compareTo(Resource other) { + long diff = this.getMemorySize() - other.getMemorySize(); + if (diff == 0) { + diff = this.getVirtualCores() - other.getVirtualCores(); + } + return diff == 0 ? 0 : (diff > 0 ? 1 : -1); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java index 6686696..e0865c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java @@ -89,15 +89,4 @@ public void setVirtualCores(int vCores) { maybeInitBuilder(); builder.setVirtualCores(vCores); } - - @Override - public int compareTo(Resource other) { - long diff = this.getMemorySize() - other.getMemorySize(); - if (diff == 0) { - diff = this.getVirtualCores() - other.getVirtualCores(); - } - return diff == 0 ? 0 : (diff > 0 ? 1 : -1); - } - - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AtomicResource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AtomicResource.java new file mode 100644 index 0000000..4a01a4b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AtomicResource.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; + +public class AtomicResource extends Resource { + private volatile long mem; + private volatile int virtualCores; + + public AtomicResource() { + this(0L, 0); + } + + public AtomicResource(long mem, int virtualCores) { + this.mem = mem; + this.virtualCores = virtualCores; + } + + @Override + @Deprecated + public int getMemory() { + return (int) getMemorySize(); + } + + @Override + @Deprecated + public void setMemory(int memory) { + setMemorySize(memory); + } + + @Override + public long getMemorySize() { + return mem; + } + + @Override + public void setMemorySize(long memory) { + this.mem = memory; + } + + @Override + public int getVirtualCores() { + return virtualCores; + } + + @Override + public void setVirtualCores(int vCores) { + this.virtualCores = vCores; + } + + public synchronized void inc(Resource res) { + if (null == res) { + return; + } + this.mem += res.getMemorySize(); + this.virtualCores += res.getVirtualCores(); + } + + public synchronized void dec(Resource res) { + if (null == res) { + return; + } + this.mem -= res.getMemorySize(); + this.virtualCores -= res.getVirtualCores(); + } + + public synchronized void set(Resource res) { + if (null == res) { + this.mem = 0L; + this.virtualCores = 0; + return; + } + this.mem = res.getMemorySize(); + this.virtualCores = res.getVirtualCores(); + } + + @Override + public int compareTo(Resource o) { + return 0; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 2857379..6d91e59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -18,19 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; - import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.util.resource.Resources; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + /** * Resource Usage by Labels for following fields by label - AM resource (to * enforce max-am-resource-by-label after YARN-2637) - Used resource (includes @@ -41,18 +38,12 @@ * And it is thread-safe */ public class ResourceUsage { - private ReadLock readLock; - private WriteLock writeLock; private Map usages; // short for no-label :) private static final String NL = CommonNodeLabelsManager.NO_LABEL; public ResourceUsage() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - readLock = lock.readLock(); - writeLock = lock.writeLock(); - - usages = new HashMap(); + usages = new HashMap<>(); usages.put(NL, new UsageByLabel(NL)); } @@ -65,19 +56,19 @@ public ResourceUsage() { private int idx; - private ResourceType(int value) { + ResourceType(int value) { this.idx = value; } } private static class UsageByLabel { // usage by label, contains all UsageType - private Resource[] resArr; + private AtomicResource[] resArr; public UsageByLabel(String label) { - resArr = new Resource[ResourceType.values().length]; + resArr = new AtomicResource[ResourceType.values().length]; for (int i = 0; i < resArr.length; i++) { - resArr[i] = Resource.newInstance(0, 0); + resArr[i] = new AtomicResource(0, 0); }; } @@ -144,14 +135,9 @@ public void setUsed(Resource res) { setUsed(NL, res); } - public void copyAllUsed(ResourceUsage other) { - try { - writeLock.lock(); - for (Entry entry : other.usages.entrySet()) { - setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed())); - } - } finally { - writeLock.unlock(); + public synchronized void copyAllUsed(ResourceUsage other) { + for (Entry entry : other.usages.entrySet()) { + setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed())); } } @@ -326,31 +312,21 @@ private Resource _get(String label, ResourceType type) { if (label == null) { label = RMNodeLabelsManager.NO_LABEL; } - - try { - readLock.lock(); - UsageByLabel usage = usages.get(label); - if (null == usage) { - return Resources.none(); - } - return normalize(usage.resArr[type.idx]); - } finally { - readLock.unlock(); + + UsageByLabel usage = usages.get(label); + if (null == usage) { + return Resources.none(); } + return normalize(usage.resArr[type.idx]); } - private Resource _getAll(ResourceType type) { - try { - readLock.lock(); - Resource allOfType = Resources.createResource(0); - for (Map.Entry usageEntry : usages.entrySet()) { - //all usages types are initialized - Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]); - } - return allOfType; - } finally { - readLock.unlock(); + private synchronized Resource _getAll(ResourceType type) { + Resource allOfType = Resources.createResource(0); + for (Map.Entry usageEntry : usages.entrySet()) { + //all usages types are initialized + Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]); } + return allOfType; } public Resource getAllPending() { @@ -366,72 +342,46 @@ private UsageByLabel getAndAddIfMissing(String label) { label = RMNodeLabelsManager.NO_LABEL; } if (!usages.containsKey(label)) { - UsageByLabel u = new UsageByLabel(label); - usages.put(label, u); - return u; + synchronized (this){ + if (!usages.containsKey(label)) { + UsageByLabel u = new UsageByLabel(label); + usages.put(label, u); + return u; + } + } } return usages.get(label); } private void _set(String label, ResourceType type, Resource res) { - try { - writeLock.lock(); - UsageByLabel usage = getAndAddIfMissing(label); - usage.resArr[type.idx] = res; - } finally { - writeLock.unlock(); - } + UsageByLabel usage = getAndAddIfMissing(label); + usage.resArr[type.idx].set(res); } private void _inc(String label, ResourceType type, Resource res) { - try { - writeLock.lock(); UsageByLabel usage = getAndAddIfMissing(label); - Resources.addTo(usage.resArr[type.idx], res); - } finally { - writeLock.unlock(); - } + usage.resArr[type.idx].inc(res); } private void _dec(String label, ResourceType type, Resource res) { - try { - writeLock.lock(); - UsageByLabel usage = getAndAddIfMissing(label); - Resources.subtractFrom(usage.resArr[type.idx], res); - } finally { - writeLock.unlock(); - } + UsageByLabel usage = getAndAddIfMissing(label); + usage.resArr[type.idx].dec(res); } public Resource getCachedDemand(String label) { - try { - readLock.lock(); - Resource demand = Resources.createResource(0); - Resources.addTo(demand, getCachedUsed(label)); - Resources.addTo(demand, getCachedPending(label)); - return demand; - } finally { - readLock.unlock(); - } + Resource demand = Resources.createResource(0); + Resources.addTo(demand, getCachedUsed(label)); + Resources.addTo(demand, getCachedPending(label)); + return demand; } @Override - public String toString() { - try { - readLock.lock(); - return usages.toString(); - } finally { - readLock.unlock(); - } + public synchronized String toString() { + return usages.toString(); } - + public Set getNodePartitionsSet() { - try { - readLock.lock(); - return usages.keySet(); - } finally { - readLock.unlock(); - } + return usages.keySet(); } }