diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 2901134..dc8d154 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.collect.ImmutableSet; +import com.google.common.annotations.VisibleForTesting; /** @@ -294,4 +295,20 @@ public synchronized void recoverContainer(RMContainer rmContainer) { public void updateLabels(Set labels) { this.labels = labels; } + + /** + * Deduct unallocated resources from the node. This is used when allocating a + * container. + * @param resource Resources to deduct. + */ + @VisibleForTesting + public synchronized void deductUnallocatedResource(Resource resource) { + if (resource == null) { + LOG.error("Invalid deduction of null resource for " + + rmNode.getNodeAddress()); + return; + } + Resources.addTo(availableResource, resource); + Resources.subtractFrom(usedResource, resource); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2e7cb6c..1422d74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -287,6 +287,18 @@ public void run() { } catch (InterruptedException e) { LOG.warn("Continuous scheduling thread interrupted. Exiting.", e); return; + } catch (IllegalArgumentException e) { + LOG.warn("Continuous scheduling thread IllegalArgumentException " + + "and will retry again.", e); + try { + Thread.sleep(getContinuousSchedulingSleepMs()); + } catch (InterruptedException ex) { + LOG.info("Continuous scheduling thread " + + "sleep exception.",e); + } + } catch (Exception e) { + LOG.error("Exception in fair scheduler " + + "ContinuousSchedulingThread.", e); } } } @@ -822,7 +834,13 @@ protected synchronized void completedContainer(RMContainer rmContainer, application.unreserve(rmContainer.getReservedPriority(), node); } else { application.containerCompleted(rmContainer, containerStatus, event); - node.releaseContainer(container); + if (continuousSchedulingEnabled) { + synchronized (this) { + node.releaseContainer(container); + } + } else { + node.releaseContainer(container); + } updateRootQueueMetrics(); } @@ -1714,4 +1732,12 @@ private String handleMoveToPlanQueue(String targetQueueName) { } return targetQueueName; } + + /** + * this method is just used for test. + */ + @VisibleForTesting + public Map getNodes(){ + return nodes; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java index a72e393..7705f84 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java @@ -37,12 +37,14 @@ import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; public class TestContinuousScheduling extends FairSchedulerTestBase { private MockClock mockClock; @@ -114,4 +116,38 @@ public void testSchedulingDelay() throws InterruptedException { } assertEquals(1024, app.getCurrentConsumption().getMemory()); } + + @Test + public void TestNodeAvailableResourceComparatorTransitivity() { + List rmNodes = + MockNodes.newNodes(2, 4000, Resource.newInstance(4096, 4)); + for (RMNode rmNode : rmNodes) { + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(rmNode); + scheduler.handle(nodeEvent1); + } + // To simulate unallocated resource changes + new Thread() { + @Override + public void run() { + for (int j = 0; j < 1000; j++) { + for (FSSchedulerNode node : scheduler.getNodes().values()) { + int i = ThreadLocalRandom.current().nextInt(-30, 0); + // if disabled synchronized scheduler will occurr exception + // 'Comparison method violates its general contract' + synchronized (scheduler) { + Resource aa=Resource.newInstance(i *(1024), i); + node.deductUnallocatedResource(aa); + } + } + } + } + }.start(); + try { + for (int j = 0; j < 10; j++) { + scheduler.continuousSchedulingAttempt(); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } }