From 8ffa26a3e87a953c46894eaaca78149b5444b8fc Mon Sep 17 00:00:00 2001 From: Guang Yang Date: Tue, 8 Sep 2020 16:55:09 -0700 Subject: [PATCH] YARN-10428: fix zombie jobs using FAIR and size based weight queue For completed application, both the "used" and "demand" resources are zero, in which case calculating the magnitude of the schedulable entity would run into divide-by-zero issue. This results in magnitude being 'NaN', which in turn make the comparator behavior not tansitive. As a result, "remove" might not be able to remove the entity from the TreeSet. The fix is to avoid divide-by-zero and make the comparator transitive. Test Plan A unit test is added; the test adds 10 schedulable entities, mark 5 of them as completed by setting the resources to 0, and then removes 5 of them. We expect 5 entities to be removed. Without the fix, the unit test consistently fails. --- .../scheduler/policy/FairOrderingPolicy.java | 5 ++- .../scheduler/policy/TestFairOrderingPolicy.java | 39 ++++++++++++++++++++++ 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java index 9e66582e04f..127c67e820e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java @@ -100,9 +100,12 @@ public FairOrderingPolicy() { private double getMagnitude(SchedulableEntity r) { double mag = r.getSchedulingResourceUsage().getCachedUsed( CommonNodeLabelsManager.ANY).getMemorySize(); - if (sizeBasedWeight) { + if (sizeBasedWeight && mag != 0) { double weight = Math.log1p(r.getSchedulingResourceUsage().getCachedDemand( CommonNodeLabelsManager.ANY).getMemorySize()) / Math.log(2); + // "weight" captures both used and demanding resources, so it could + // be zero only when "mag" (or used resource) is zero, in which case + // this branch wouldn't be reached mag = mag / weight; } return mag; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java index d51f9f5a250..4c8a1dd2f4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java @@ -23,6 +23,7 @@ import java.util.*; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; @@ -104,6 +105,44 @@ public void testSizeBasedWeight() { } @Test + public void testSizeBasedWeightWithCompletedJob() { + FairOrderingPolicy policy = + new FairOrderingPolicy(); + policy.setSizeBasedWeight(true); + + // Add 10 different schedulable entities + List entities = new ArrayList<>(10); + for (int i = 1; i <= 10; i++) { + MockSchedulableEntity r = new MockSchedulableEntity(); + r.setApplicationPriority(Priority.newInstance(i)); + r.setUsed(Resources.createResource(4 * i)); + r.setPending(Resources.createResource(4 * i)); + AbstractComparatorOrderingPolicy.updateSchedulingResourceUsage( + r.getSchedulingResourceUsage()); + policy.addSchedulableEntity(r); + entities.add(r); + } + + // Mark the first 5 entities as completed by setting + // the resources to 0 + for (int i = 0; i < 5; i++) { + MockSchedulableEntity r = entities.get(i); + r.setUsed(Resources.createResource(0)); + r.setPending(Resources.createResource(0)); + policy.entityRequiresReordering(r); + } + + policy.reorderScheduleEntities(); + + // Remove the first 5 elements + for (int i = 0; i < 5; i ++) { + policy.removeSchedulableEntity(entities.get(i)); + } + + Assert.assertEquals(5, policy.getNumSchedulableEntities()); + } + + @Test public void testIterators() { OrderingPolicy schedOrder = new FairOrderingPolicy(); -- 2.13.2